001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.lang.reflect.InvocationTargetException; 024import java.net.BindException; 025import java.net.InetSocketAddress; 026import java.net.UnknownHostException; 027import java.nio.ByteBuffer; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.HashMap; 032import java.util.Iterator; 033import java.util.List; 034import java.util.Map; 035import java.util.Map.Entry; 036import java.util.NavigableMap; 037import java.util.Set; 038import java.util.TreeSet; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.concurrent.ConcurrentMap; 041import java.util.concurrent.TimeUnit; 042import java.util.concurrent.atomic.AtomicBoolean; 043import java.util.concurrent.atomic.AtomicLong; 044import java.util.concurrent.atomic.LongAdder; 045import org.apache.commons.lang3.mutable.MutableObject; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.fs.Path; 048import org.apache.hadoop.hbase.ByteBufferExtendedCell; 049import org.apache.hadoop.hbase.CacheEvictionStats; 050import org.apache.hadoop.hbase.CacheEvictionStatsBuilder; 051import org.apache.hadoop.hbase.Cell; 052import org.apache.hadoop.hbase.CellScannable; 053import org.apache.hadoop.hbase.CellScanner; 054import org.apache.hadoop.hbase.CellUtil; 055import org.apache.hadoop.hbase.CompareOperator; 056import org.apache.hadoop.hbase.DoNotRetryIOException; 057import org.apache.hadoop.hbase.DroppedSnapshotException; 058import org.apache.hadoop.hbase.HBaseIOException; 059import org.apache.hadoop.hbase.HConstants; 060import org.apache.hadoop.hbase.MultiActionResultTooLarge; 061import org.apache.hadoop.hbase.NotServingRegionException; 062import org.apache.hadoop.hbase.PrivateCellUtil; 063import org.apache.hadoop.hbase.RegionTooBusyException; 064import org.apache.hadoop.hbase.Server; 065import org.apache.hadoop.hbase.ServerName; 066import org.apache.hadoop.hbase.TableName; 067import org.apache.hadoop.hbase.UnknownScannerException; 068import org.apache.hadoop.hbase.client.Append; 069import org.apache.hadoop.hbase.client.ConnectionUtils; 070import org.apache.hadoop.hbase.client.Delete; 071import org.apache.hadoop.hbase.client.Durability; 072import org.apache.hadoop.hbase.client.Get; 073import org.apache.hadoop.hbase.client.Increment; 074import org.apache.hadoop.hbase.client.Mutation; 075import org.apache.hadoop.hbase.client.Put; 076import org.apache.hadoop.hbase.client.RegionInfo; 077import org.apache.hadoop.hbase.client.RegionReplicaUtil; 078import org.apache.hadoop.hbase.client.Result; 079import org.apache.hadoop.hbase.client.Row; 080import org.apache.hadoop.hbase.client.RowMutations; 081import org.apache.hadoop.hbase.client.Scan; 082import org.apache.hadoop.hbase.client.TableDescriptor; 083import org.apache.hadoop.hbase.client.VersionInfoUtil; 084import org.apache.hadoop.hbase.conf.ConfigurationObserver; 085import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; 086import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 087import org.apache.hadoop.hbase.exceptions.ScannerResetException; 088import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 089import org.apache.hadoop.hbase.filter.ByteArrayComparable; 090import org.apache.hadoop.hbase.io.TimeRange; 091import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; 092import org.apache.hadoop.hbase.ipc.HBaseRpcController; 093import org.apache.hadoop.hbase.ipc.PriorityFunction; 094import org.apache.hadoop.hbase.ipc.QosPriority; 095import org.apache.hadoop.hbase.ipc.RpcCallContext; 096import org.apache.hadoop.hbase.ipc.RpcCallback; 097import org.apache.hadoop.hbase.ipc.RpcScheduler; 098import org.apache.hadoop.hbase.ipc.RpcServer; 099import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 100import org.apache.hadoop.hbase.ipc.RpcServerFactory; 101import org.apache.hadoop.hbase.ipc.RpcServerInterface; 102import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 103import org.apache.hadoop.hbase.ipc.ServerRpcController; 104import org.apache.hadoop.hbase.log.HBaseMarkers; 105import org.apache.hadoop.hbase.master.MasterRpcServices; 106import org.apache.hadoop.hbase.net.Address; 107import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 108import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; 109import org.apache.hadoop.hbase.quotas.OperationQuota; 110import org.apache.hadoop.hbase.quotas.QuotaUtil; 111import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 112import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 113import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 114import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; 115import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; 116import org.apache.hadoop.hbase.regionserver.Leases.Lease; 117import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; 118import org.apache.hadoop.hbase.regionserver.Region.Operation; 119import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 120import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 121import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; 122import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; 123import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; 124import org.apache.hadoop.hbase.security.Superusers; 125import org.apache.hadoop.hbase.security.User; 126import org.apache.hadoop.hbase.security.access.AccessChecker; 127import org.apache.hadoop.hbase.security.access.Permission; 128import org.apache.hadoop.hbase.util.Bytes; 129import org.apache.hadoop.hbase.util.DNS; 130import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 131import org.apache.hadoop.hbase.util.Pair; 132import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 133import org.apache.hadoop.hbase.util.Strings; 134import org.apache.hadoop.hbase.wal.WAL; 135import org.apache.hadoop.hbase.wal.WALEdit; 136import org.apache.hadoop.hbase.wal.WALKey; 137import org.apache.hadoop.hbase.wal.WALSplitter; 138import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 139import org.apache.yetus.audience.InterfaceAudience; 140import org.slf4j.Logger; 141import org.slf4j.LoggerFactory; 142 143import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 144import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 145import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 146import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 147import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 148import org.apache.hbase.thirdparty.com.google.protobuf.Message; 149import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 150import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 151import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 152import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 153import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 154 155import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 156import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 157import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 158import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 241 242/** 243 * Implements the regionserver RPC services. 244 */ 245@InterfaceAudience.Private 246@SuppressWarnings("deprecation") 247public class RSRpcServices implements HBaseRPCErrorHandler, 248 AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction, 249 ConfigurationObserver { 250 protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class); 251 252 /** RPC scheduler to use for the region server. */ 253 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = 254 "hbase.region.server.rpc.scheduler.factory.class"; 255 256 /** RPC scheduler to use for the master. */ 257 public static final String MASTER_RPC_SCHEDULER_FACTORY_CLASS = 258 "hbase.master.rpc.scheduler.factory.class"; 259 260 /** 261 * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This 262 * configuration exists to prevent the scenario where a time limit is specified to be so 263 * restrictive that the time limit is reached immediately (before any cells are scanned). 264 */ 265 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 266 "hbase.region.server.rpc.minimum.scan.time.limit.delta"; 267 /** 268 * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} 269 */ 270 private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; 271 272 /** 273 * Number of rows in a batch operation above which a warning will be logged. 274 */ 275 static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold"; 276 /** 277 * Default value of {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME} 278 */ 279 static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000; 280 281 protected static final String RESERVOIR_ENABLED_KEY = "hbase.ipc.server.reservoir.enabled"; 282 283 // Request counter. (Includes requests that are not serviced by regions.) 284 // Count only once for requests with multiple actions like multi/caching-scan/replayBatch 285 final LongAdder requestCount = new LongAdder(); 286 287 // Request counter for rpc get 288 final LongAdder rpcGetRequestCount = new LongAdder(); 289 290 // Request counter for rpc scan 291 final LongAdder rpcScanRequestCount = new LongAdder(); 292 293 // Request counter for rpc multi 294 final LongAdder rpcMultiRequestCount = new LongAdder(); 295 296 // Request counter for rpc mutate 297 final LongAdder rpcMutateRequestCount = new LongAdder(); 298 299 // Server to handle client requests. 300 final RpcServerInterface rpcServer; 301 final InetSocketAddress isa; 302 303 private final HRegionServer regionServer; 304 private final long maxScannerResultSize; 305 306 // The reference to the priority extraction function 307 private final PriorityFunction priority; 308 309 private ScannerIdGenerator scannerIdGenerator; 310 private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>(); 311 // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients 312 // which may send next or close request to a region scanner which has already been exhausted. The 313 // entries will be removed automatically after scannerLeaseTimeoutPeriod. 314 private final Cache<String, String> closedScanners; 315 /** 316 * The lease timeout period for client scanners (milliseconds). 317 */ 318 private final int scannerLeaseTimeoutPeriod; 319 320 /** 321 * The RPC timeout period (milliseconds) 322 */ 323 private final int rpcTimeout; 324 325 /** 326 * The minimum allowable delta to use for the scan limit 327 */ 328 private final long minimumScanTimeLimitDelta; 329 330 /** 331 * Row size threshold for multi requests above which a warning is logged 332 */ 333 private final int rowSizeWarnThreshold; 334 335 final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); 336 337 // We want to vet all accesses at the point of entry itself; limiting scope of access checker 338 // instance to only this class to prevent its use from spreading deeper into implementation. 339 // Initialized in start() since AccessChecker needs ZKWatcher which is created by HRegionServer 340 // after RSRpcServices constructor and before start() is called. 341 // Initialized only if authorization is enabled, else remains null. 342 protected AccessChecker accessChecker; 343 344 /** 345 * Services launched in RSRpcServices. By default they are on but you can use the below 346 * booleans to selectively enable/disable either Admin or Client Service (Rare is the case 347 * where you would ever turn off one or the other). 348 */ 349 public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG = 350 "hbase.regionserver.admin.executorService"; 351 public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG = 352 "hbase.regionserver.client.executorService"; 353 354 /** 355 * An Rpc callback for closing a RegionScanner. 356 */ 357 private static final class RegionScannerCloseCallBack implements RpcCallback { 358 359 private final RegionScanner scanner; 360 361 public RegionScannerCloseCallBack(RegionScanner scanner) { 362 this.scanner = scanner; 363 } 364 365 @Override 366 public void run() throws IOException { 367 this.scanner.close(); 368 } 369 } 370 371 /** 372 * An Rpc callback for doing shipped() call on a RegionScanner. 373 */ 374 private class RegionScannerShippedCallBack implements RpcCallback { 375 376 private final String scannerName; 377 private final Shipper shipper; 378 private final Lease lease; 379 380 public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) { 381 this.scannerName = scannerName; 382 this.shipper = shipper; 383 this.lease = lease; 384 } 385 386 @Override 387 public void run() throws IOException { 388 this.shipper.shipped(); 389 // We're done. On way out re-add the above removed lease. The lease was temp removed for this 390 // Rpc call and we are at end of the call now. Time to add it back. 391 if (scanners.containsKey(scannerName)) { 392 if (lease != null) regionServer.leases.addLease(lease); 393 } 394 } 395 } 396 397 /** 398 * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on 399 * completion of multiGets. 400 */ 401 static class RegionScannersCloseCallBack implements RpcCallback { 402 private final List<RegionScanner> scanners = new ArrayList<>(); 403 404 public void addScanner(RegionScanner scanner) { 405 this.scanners.add(scanner); 406 } 407 408 @Override 409 public void run() { 410 for (RegionScanner scanner : scanners) { 411 try { 412 scanner.close(); 413 } catch (IOException e) { 414 LOG.error("Exception while closing the scanner " + scanner, e); 415 } 416 } 417 } 418 } 419 420 /** 421 * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. 422 */ 423 private static final class RegionScannerHolder { 424 425 private final AtomicLong nextCallSeq = new AtomicLong(0); 426 private final String scannerName; 427 private final RegionScanner s; 428 private final HRegion r; 429 private final RpcCallback closeCallBack; 430 private final RpcCallback shippedCallback; 431 private byte[] rowOfLastPartialResult; 432 private boolean needCursor; 433 434 public RegionScannerHolder(String scannerName, RegionScanner s, HRegion r, 435 RpcCallback closeCallBack, RpcCallback shippedCallback, boolean needCursor) { 436 this.scannerName = scannerName; 437 this.s = s; 438 this.r = r; 439 this.closeCallBack = closeCallBack; 440 this.shippedCallback = shippedCallback; 441 this.needCursor = needCursor; 442 } 443 444 public long getNextCallSeq() { 445 return nextCallSeq.get(); 446 } 447 448 public boolean incNextCallSeq(long currentSeq) { 449 // Use CAS to prevent multiple scan request running on the same scanner. 450 return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); 451 } 452 } 453 454 /** 455 * Instantiated as a scanner lease. If the lease times out, the scanner is 456 * closed 457 */ 458 private class ScannerListener implements LeaseListener { 459 private final String scannerName; 460 461 ScannerListener(final String n) { 462 this.scannerName = n; 463 } 464 465 @Override 466 public void leaseExpired() { 467 RegionScannerHolder rsh = scanners.remove(this.scannerName); 468 if (rsh != null) { 469 RegionScanner s = rsh.s; 470 LOG.info("Scanner " + this.scannerName + " lease expired on region " 471 + s.getRegionInfo().getRegionNameAsString()); 472 HRegion region = null; 473 try { 474 region = regionServer.getRegion(s.getRegionInfo().getRegionName()); 475 if (region != null && region.getCoprocessorHost() != null) { 476 region.getCoprocessorHost().preScannerClose(s); 477 } 478 } catch (IOException e) { 479 LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e); 480 } finally { 481 try { 482 s.close(); 483 if (region != null && region.getCoprocessorHost() != null) { 484 region.getCoprocessorHost().postScannerClose(s); 485 } 486 } catch (IOException e) { 487 LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e); 488 } 489 } 490 } else { 491 LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" + 492 " scanner found, hence no chance to close that related scanner!"); 493 } 494 } 495 } 496 497 private static ResultOrException getResultOrException(final ClientProtos.Result r, 498 final int index){ 499 return getResultOrException(ResponseConverter.buildActionResult(r), index); 500 } 501 502 private static ResultOrException getResultOrException(final Exception e, final int index) { 503 return getResultOrException(ResponseConverter.buildActionResult(e), index); 504 } 505 506 private static ResultOrException getResultOrException( 507 final ResultOrException.Builder builder, final int index) { 508 return builder.setIndex(index).build(); 509 } 510 511 /** 512 * Checks for the following pre-checks in order: 513 * <ol> 514 * <li>RegionServer is running</li> 515 * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li> 516 * </ol> 517 * @param requestName name of rpc request. Used in reporting failures to provide context. 518 * @throws ServiceException If any of the above listed pre-check fails. 519 */ 520 private void rpcPreCheck(String requestName) throws ServiceException { 521 try { 522 checkOpen(); 523 requirePermission(requestName, Permission.Action.ADMIN); 524 } catch (IOException ioe) { 525 throw new ServiceException(ioe); 526 } 527 } 528 529 /** 530 * Starts the nonce operation for a mutation, if needed. 531 * @param mutation Mutation. 532 * @param nonceGroup Nonce group from the request. 533 * @return whether to proceed this mutation. 534 */ 535 private boolean startNonceOperation(final MutationProto mutation, long nonceGroup) 536 throws IOException { 537 if (regionServer.nonceManager == null || !mutation.hasNonce()) return true; 538 boolean canProceed = false; 539 try { 540 canProceed = regionServer.nonceManager.startOperation( 541 nonceGroup, mutation.getNonce(), regionServer); 542 } catch (InterruptedException ex) { 543 throw new InterruptedIOException("Nonce start operation interrupted"); 544 } 545 return canProceed; 546 } 547 548 /** 549 * Ends nonce operation for a mutation, if needed. 550 * @param mutation Mutation. 551 * @param nonceGroup Nonce group from the request. Always 0 in initial implementation. 552 * @param success Whether the operation for this nonce has succeeded. 553 */ 554 private void endNonceOperation(final MutationProto mutation, 555 long nonceGroup, boolean success) { 556 if (regionServer.nonceManager != null && mutation.hasNonce()) { 557 regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success); 558 } 559 } 560 561 private boolean isClientCellBlockSupport(RpcCallContext context) { 562 return context != null && context.isClientCellBlockSupported(); 563 } 564 565 private void addResult(final MutateResponse.Builder builder, final Result result, 566 final HBaseRpcController rpcc, boolean clientCellBlockSupported) { 567 if (result == null) return; 568 if (clientCellBlockSupported) { 569 builder.setResult(ProtobufUtil.toResultNoData(result)); 570 rpcc.setCellScanner(result.cellScanner()); 571 } else { 572 ClientProtos.Result pbr = ProtobufUtil.toResult(result); 573 builder.setResult(pbr); 574 } 575 } 576 577 private void addResults(ScanResponse.Builder builder, List<Result> results, 578 HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { 579 builder.setStale(!isDefaultRegion); 580 if (results.isEmpty()) { 581 return; 582 } 583 if (clientCellBlockSupported) { 584 for (Result res : results) { 585 builder.addCellsPerResult(res.size()); 586 builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow()); 587 } 588 controller.setCellScanner(CellUtil.createCellScanner(results)); 589 } else { 590 for (Result res : results) { 591 ClientProtos.Result pbr = ProtobufUtil.toResult(res); 592 builder.addResults(pbr); 593 } 594 } 595 } 596 597 /** 598 * Mutate a list of rows atomically. 599 * @param cellScanner if non-null, the mutation data -- the Cell content. 600 */ 601 private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions, 602 final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, CompareOperator op, 603 ByteArrayComparable comparator, TimeRange timeRange, RegionActionResult.Builder builder, 604 ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 605 int countOfCompleteMutation = 0; 606 try { 607 if (!region.getRegionInfo().isMetaRegion()) { 608 regionServer.cacheFlusher.reclaimMemStoreMemory(); 609 } 610 RowMutations rm = null; 611 int i = 0; 612 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 613 ClientProtos.ResultOrException.newBuilder(); 614 for (ClientProtos.Action action: actions) { 615 if (action.hasGet()) { 616 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + 617 action.getGet()); 618 } 619 MutationType type = action.getMutation().getMutateType(); 620 if (rm == null) { 621 rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size()); 622 } 623 switch (type) { 624 case PUT: 625 Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); 626 ++countOfCompleteMutation; 627 checkCellSizeLimit(region, put); 628 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); 629 rm.add(put); 630 break; 631 case DELETE: 632 Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner); 633 ++countOfCompleteMutation; 634 spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); 635 rm.add(del); 636 break; 637 default: 638 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); 639 } 640 // To unify the response format with doNonAtomicRegionMutation and read through client's 641 // AsyncProcess we have to add an empty result instance per operation 642 resultOrExceptionOrBuilder.clear(); 643 resultOrExceptionOrBuilder.setIndex(i++); 644 builder.addResultOrException( 645 resultOrExceptionOrBuilder.build()); 646 } 647 return region.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm); 648 } finally { 649 // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner 650 // even if the malformed cells are not skipped. 651 for (int i = countOfCompleteMutation; i < actions.size(); ++i) { 652 skipCellsForMutation(actions.get(i), cellScanner); 653 } 654 } 655 } 656 657 /** 658 * Execute an append mutation. 659 * 660 * @return result to return to client if default operation should be 661 * bypassed as indicated by RegionObserver, null otherwise 662 */ 663 private Result append(final HRegion region, final OperationQuota quota, 664 final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, 665 ActivePolicyEnforcement spaceQuota) 666 throws IOException { 667 long before = EnvironmentEdgeManager.currentTime(); 668 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 669 checkCellSizeLimit(region, append); 670 spaceQuota.getPolicyEnforcement(region).check(append); 671 quota.addMutation(append); 672 Result r = null; 673 if (region.getCoprocessorHost() != null) { 674 r = region.getCoprocessorHost().preAppend(append); 675 } 676 if (r == null) { 677 boolean canProceed = startNonceOperation(mutation, nonceGroup); 678 boolean success = false; 679 try { 680 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 681 if (canProceed) { 682 r = region.append(append, nonceGroup, nonce); 683 } else { 684 // convert duplicate append to get 685 List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cellScanner), false, 686 nonceGroup, nonce); 687 r = Result.create(results); 688 } 689 success = true; 690 } finally { 691 if (canProceed) { 692 endNonceOperation(mutation, nonceGroup, success); 693 } 694 } 695 if (region.getCoprocessorHost() != null) { 696 r = region.getCoprocessorHost().postAppend(append, r); 697 } 698 } 699 if (regionServer.metricsRegionServer != null) { 700 regionServer.metricsRegionServer.updateAppend( 701 region.getTableDescriptor().getTableName(), 702 EnvironmentEdgeManager.currentTime() - before); 703 } 704 return r == null ? Result.EMPTY_RESULT : r; 705 } 706 707 /** 708 * Execute an increment mutation. 709 * 710 * @param region 711 * @param mutation 712 * @return the Result 713 * @throws IOException 714 */ 715 private Result increment(final HRegion region, final OperationQuota quota, 716 final MutationProto mutation, final CellScanner cells, long nonceGroup, 717 ActivePolicyEnforcement spaceQuota) 718 throws IOException { 719 long before = EnvironmentEdgeManager.currentTime(); 720 Increment increment = ProtobufUtil.toIncrement(mutation, cells); 721 checkCellSizeLimit(region, increment); 722 spaceQuota.getPolicyEnforcement(region).check(increment); 723 quota.addMutation(increment); 724 Result r = null; 725 if (region.getCoprocessorHost() != null) { 726 r = region.getCoprocessorHost().preIncrement(increment); 727 } 728 if (r == null) { 729 boolean canProceed = startNonceOperation(mutation, nonceGroup); 730 boolean success = false; 731 try { 732 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 733 if (canProceed) { 734 r = region.increment(increment, nonceGroup, nonce); 735 } else { 736 // convert duplicate increment to get 737 List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cells), false, nonceGroup, 738 nonce); 739 r = Result.create(results); 740 } 741 success = true; 742 } finally { 743 if (canProceed) { 744 endNonceOperation(mutation, nonceGroup, success); 745 } 746 } 747 if (region.getCoprocessorHost() != null) { 748 r = region.getCoprocessorHost().postIncrement(increment, r); 749 } 750 } 751 if (regionServer.metricsRegionServer != null) { 752 regionServer.metricsRegionServer.updateIncrement( 753 region.getTableDescriptor().getTableName(), 754 EnvironmentEdgeManager.currentTime() - before); 755 } 756 return r == null ? Result.EMPTY_RESULT : r; 757 } 758 759 /** 760 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when 761 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. 762 * @param cellsToReturn Could be null. May be allocated in this method. This is what this 763 * method returns as a 'result'. 764 * @param closeCallBack the callback to be used with multigets 765 * @param context the current RpcCallContext 766 * @return Return the <code>cellScanner</code> passed 767 */ 768 private List<CellScannable> doNonAtomicRegionMutation(final HRegion region, 769 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, 770 final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup, 771 final RegionScannersCloseCallBack closeCallBack, RpcCallContext context, 772 ActivePolicyEnforcement spaceQuotaEnforcement) { 773 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do 774 // one at a time, we instead pass them in batch. Be aware that the corresponding 775 // ResultOrException instance that matches each Put or Delete is then added down in the 776 // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are 777 // deferred/batched 778 List<ClientProtos.Action> mutations = null; 779 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); 780 IOException sizeIOE = null; 781 Object lastBlock = null; 782 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = ResultOrException.newBuilder(); 783 boolean hasResultOrException = false; 784 for (ClientProtos.Action action : actions.getActionList()) { 785 hasResultOrException = false; 786 resultOrExceptionBuilder.clear(); 787 try { 788 Result r = null; 789 790 if (context != null 791 && context.isRetryImmediatelySupported() 792 && (context.getResponseCellSize() > maxQuotaResultSize 793 || context.getResponseBlockSize() + context.getResponseExceptionSize() 794 > maxQuotaResultSize)) { 795 796 // We're storing the exception since the exception and reason string won't 797 // change after the response size limit is reached. 798 if (sizeIOE == null ) { 799 // We don't need the stack un-winding do don't throw the exception. 800 // Throwing will kill the JVM's JIT. 801 // 802 // Instead just create the exception and then store it. 803 sizeIOE = new MultiActionResultTooLarge("Max size exceeded" 804 + " CellSize: " + context.getResponseCellSize() 805 + " BlockSize: " + context.getResponseBlockSize()); 806 807 // Only report the exception once since there's only one request that 808 // caused the exception. Otherwise this number will dominate the exceptions count. 809 rpcServer.getMetrics().exception(sizeIOE); 810 } 811 812 // Now that there's an exception is known to be created 813 // use it for the response. 814 // 815 // This will create a copy in the builder. 816 NameBytesPair pair = ResponseConverter.buildException(sizeIOE); 817 resultOrExceptionBuilder.setException(pair); 818 context.incrementResponseExceptionSize(pair.getSerializedSize()); 819 resultOrExceptionBuilder.setIndex(action.getIndex()); 820 builder.addResultOrException(resultOrExceptionBuilder.build()); 821 skipCellsForMutation(action, cellScanner); 822 continue; 823 } 824 if (action.hasGet()) { 825 long before = EnvironmentEdgeManager.currentTime(); 826 ClientProtos.Get pbGet = action.getGet(); 827 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 828 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 829 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 830 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 831 if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) { 832 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " + 833 "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " + 834 "reverse Scan."); 835 } 836 try { 837 Get get = ProtobufUtil.toGet(pbGet); 838 if (context != null) { 839 r = get(get, (region), closeCallBack, context); 840 } else { 841 r = region.get(get); 842 } 843 } finally { 844 if (regionServer.metricsRegionServer != null) { 845 regionServer.metricsRegionServer.updateGet( 846 region.getTableDescriptor().getTableName(), 847 EnvironmentEdgeManager.currentTime() - before); 848 } 849 } 850 } else if (action.hasServiceCall()) { 851 hasResultOrException = true; 852 com.google.protobuf.Message result = 853 execServiceOnRegion(region, action.getServiceCall()); 854 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = 855 ClientProtos.CoprocessorServiceResult.newBuilder(); 856 resultOrExceptionBuilder.setServiceResult( 857 serviceResultBuilder.setValue( 858 serviceResultBuilder.getValueBuilder() 859 .setName(result.getClass().getName()) 860 // TODO: Copy!!! 861 .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray())))); 862 } else if (action.hasMutation()) { 863 MutationType type = action.getMutation().getMutateType(); 864 if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null && 865 !mutations.isEmpty()) { 866 // Flush out any Puts or Deletes already collected. 867 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, 868 spaceQuotaEnforcement); 869 mutations.clear(); 870 } 871 switch (type) { 872 case APPEND: 873 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup, 874 spaceQuotaEnforcement); 875 break; 876 case INCREMENT: 877 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup, 878 spaceQuotaEnforcement); 879 break; 880 case PUT: 881 case DELETE: 882 // Collect the individual mutations and apply in a batch 883 if (mutations == null) { 884 mutations = new ArrayList<>(actions.getActionCount()); 885 } 886 mutations.add(action); 887 break; 888 default: 889 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 890 } 891 } else { 892 throw new HBaseIOException("Unexpected Action type"); 893 } 894 if (r != null) { 895 ClientProtos.Result pbResult = null; 896 if (isClientCellBlockSupport(context)) { 897 pbResult = ProtobufUtil.toResultNoData(r); 898 // Hard to guess the size here. Just make a rough guess. 899 if (cellsToReturn == null) { 900 cellsToReturn = new ArrayList<>(); 901 } 902 cellsToReturn.add(r); 903 } else { 904 pbResult = ProtobufUtil.toResult(r); 905 } 906 lastBlock = addSize(context, r, lastBlock); 907 hasResultOrException = true; 908 resultOrExceptionBuilder.setResult(pbResult); 909 } 910 // Could get to here and there was no result and no exception. Presumes we added 911 // a Put or Delete to the collecting Mutations List for adding later. In this 912 // case the corresponding ResultOrException instance for the Put or Delete will be added 913 // down in the doNonAtomicBatchOp method call rather than up here. 914 } catch (IOException ie) { 915 rpcServer.getMetrics().exception(ie); 916 hasResultOrException = true; 917 NameBytesPair pair = ResponseConverter.buildException(ie); 918 resultOrExceptionBuilder.setException(pair); 919 context.incrementResponseExceptionSize(pair.getSerializedSize()); 920 } 921 if (hasResultOrException) { 922 // Propagate index. 923 resultOrExceptionBuilder.setIndex(action.getIndex()); 924 builder.addResultOrException(resultOrExceptionBuilder.build()); 925 } 926 } 927 // Finish up any outstanding mutations 928 if (!CollectionUtils.isEmpty(mutations)) { 929 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); 930 } 931 return cellsToReturn; 932 } 933 934 private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException { 935 if (r.maxCellSize > 0) { 936 CellScanner cells = m.cellScanner(); 937 while (cells.advance()) { 938 int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current()); 939 if (size > r.maxCellSize) { 940 String msg = "Cell with size " + size + " exceeds limit of " + r.maxCellSize + " bytes"; 941 if (LOG.isDebugEnabled()) { 942 LOG.debug(msg); 943 } 944 throw new DoNotRetryIOException(msg); 945 } 946 } 947 } 948 } 949 950 private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 951 final OperationQuota quota, final List<ClientProtos.Action> mutations, 952 final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) 953 throws IOException { 954 // Just throw the exception. The exception will be caught and then added to region-level 955 // exception for RegionAction. Leaving the null to action result is ok since the null 956 // result is viewed as failure by hbase client. And the region-lever exception will be used 957 // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and 958 // AsyncBatchRpcRetryingCaller#onComplete for more details. 959 doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, true); 960 } 961 962 private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 963 final OperationQuota quota, final List<ClientProtos.Action> mutations, 964 final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) { 965 try { 966 doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, false); 967 } catch (IOException e) { 968 // Set the exception for each action. The mutations in same RegionAction are group to 969 // different batch and then be processed individually. Hence, we don't set the region-level 970 // exception here for whole RegionAction. 971 for (Action mutation : mutations) { 972 builder.addResultOrException(getResultOrException(e, mutation.getIndex())); 973 } 974 } 975 } 976 977 /** 978 * Execute a list of Put/Delete mutations. 979 * 980 * @param builder 981 * @param region 982 * @param mutations 983 */ 984 private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, 985 final OperationQuota quota, final List<ClientProtos.Action> mutations, 986 final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic) 987 throws IOException { 988 Mutation[] mArray = new Mutation[mutations.size()]; 989 long before = EnvironmentEdgeManager.currentTime(); 990 boolean batchContainsPuts = false, batchContainsDelete = false; 991 try { 992 /** HBASE-17924 993 * mutationActionMap is a map to map the relation between mutations and actions 994 * since mutation array may have been reoredered.In order to return the right 995 * result or exception to the corresponding actions, We need to know which action 996 * is the mutation belong to. We can't sort ClientProtos.Action array, since they 997 * are bonded to cellscanners. 998 */ 999 Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>(); 1000 int i = 0; 1001 for (ClientProtos.Action action: mutations) { 1002 if (action.hasGet()) { 1003 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + 1004 action.getGet()); 1005 } 1006 MutationProto m = action.getMutation(); 1007 Mutation mutation; 1008 if (m.getMutateType() == MutationType.PUT) { 1009 mutation = ProtobufUtil.toPut(m, cells); 1010 batchContainsPuts = true; 1011 } else { 1012 mutation = ProtobufUtil.toDelete(m, cells); 1013 batchContainsDelete = true; 1014 } 1015 mutationActionMap.put(mutation, action); 1016 mArray[i++] = mutation; 1017 checkCellSizeLimit(region, mutation); 1018 // Check if a space quota disallows this mutation 1019 spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation); 1020 quota.addMutation(mutation); 1021 } 1022 1023 if (!region.getRegionInfo().isMetaRegion()) { 1024 regionServer.cacheFlusher.reclaimMemStoreMemory(); 1025 } 1026 1027 // HBASE-17924 1028 // Sort to improve lock efficiency for non-atomic batch of operations. If atomic 1029 // order is preserved as its expected from the client 1030 if (!atomic) { 1031 Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); 1032 } 1033 1034 OperationStatus[] codes = region.batchMutate(mArray, atomic, HConstants.NO_NONCE, 1035 HConstants.NO_NONCE); 1036 for (i = 0; i < codes.length; i++) { 1037 Mutation currentMutation = mArray[i]; 1038 ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); 1039 int index = currentAction.hasIndex() || !atomic ? currentAction.getIndex() : i; 1040 Exception e = null; 1041 switch (codes[i].getOperationStatusCode()) { 1042 case BAD_FAMILY: 1043 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg()); 1044 builder.addResultOrException(getResultOrException(e, index)); 1045 break; 1046 1047 case SANITY_CHECK_FAILURE: 1048 e = new FailedSanityCheckException(codes[i].getExceptionMsg()); 1049 builder.addResultOrException(getResultOrException(e, index)); 1050 break; 1051 1052 default: 1053 e = new DoNotRetryIOException(codes[i].getExceptionMsg()); 1054 builder.addResultOrException(getResultOrException(e, index)); 1055 break; 1056 1057 case SUCCESS: 1058 builder.addResultOrException(getResultOrException( 1059 ClientProtos.Result.getDefaultInstance(), index)); 1060 break; 1061 1062 case STORE_TOO_BUSY: 1063 e = new RegionTooBusyException(codes[i].getExceptionMsg()); 1064 builder.addResultOrException(getResultOrException(e, index)); 1065 break; 1066 } 1067 } 1068 } finally { 1069 int processedMutationIndex = 0; 1070 for (Action mutation : mutations) { 1071 // The non-null mArray[i] means the cell scanner has been read. 1072 if (mArray[processedMutationIndex++] == null) { 1073 skipCellsForMutation(mutation, cells); 1074 } 1075 } 1076 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1077 } 1078 } 1079 1080 private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts, 1081 boolean batchContainsDelete) { 1082 if (regionServer.metricsRegionServer != null) { 1083 long after = EnvironmentEdgeManager.currentTime(); 1084 if (batchContainsPuts) { 1085 regionServer.metricsRegionServer 1086 .updatePutBatch(region.getTableDescriptor().getTableName(), after - starttime); 1087 } 1088 if (batchContainsDelete) { 1089 regionServer.metricsRegionServer 1090 .updateDeleteBatch(region.getTableDescriptor().getTableName(), after - starttime); 1091 } 1092 } 1093 } 1094 1095 /** 1096 * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of 1097 * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. 1098 * @param region 1099 * @param mutations 1100 * @param replaySeqId 1101 * @return an array of OperationStatus which internally contains the OperationStatusCode and the 1102 * exceptionMessage if any 1103 * @throws IOException 1104 */ 1105 private OperationStatus [] doReplayBatchOp(final HRegion region, 1106 final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException { 1107 long before = EnvironmentEdgeManager.currentTime(); 1108 boolean batchContainsPuts = false, batchContainsDelete = false; 1109 try { 1110 for (Iterator<WALSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) { 1111 WALSplitter.MutationReplay m = it.next(); 1112 1113 if (m.type == MutationType.PUT) { 1114 batchContainsPuts = true; 1115 } else { 1116 batchContainsDelete = true; 1117 } 1118 1119 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap(); 1120 List<Cell> metaCells = map.get(WALEdit.METAFAMILY); 1121 if (metaCells != null && !metaCells.isEmpty()) { 1122 for (Cell metaCell : metaCells) { 1123 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); 1124 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 1125 HRegion hRegion = region; 1126 if (compactionDesc != null) { 1127 // replay the compaction. Remove the files from stores only if we are the primary 1128 // region replica (thus own the files) 1129 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, 1130 replaySeqId); 1131 continue; 1132 } 1133 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); 1134 if (flushDesc != null && !isDefaultReplica) { 1135 hRegion.replayWALFlushMarker(flushDesc, replaySeqId); 1136 continue; 1137 } 1138 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); 1139 if (regionEvent != null && !isDefaultReplica) { 1140 hRegion.replayWALRegionEventMarker(regionEvent); 1141 continue; 1142 } 1143 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell); 1144 if (bulkLoadEvent != null) { 1145 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent); 1146 continue; 1147 } 1148 } 1149 it.remove(); 1150 } 1151 } 1152 requestCount.increment(); 1153 if (!region.getRegionInfo().isMetaRegion()) { 1154 regionServer.cacheFlusher.reclaimMemStoreMemory(); 1155 } 1156 return region.batchReplay(mutations.toArray( 1157 new WALSplitter.MutationReplay[mutations.size()]), replaySeqId); 1158 } finally { 1159 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1160 } 1161 } 1162 1163 private void closeAllScanners() { 1164 // Close any outstanding scanners. Means they'll get an UnknownScanner 1165 // exception next time they come in. 1166 for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) { 1167 try { 1168 e.getValue().s.close(); 1169 } catch (IOException ioe) { 1170 LOG.warn("Closing scanner " + e.getKey(), ioe); 1171 } 1172 } 1173 } 1174 1175 // Exposed for testing 1176 interface LogDelegate { 1177 void logBatchWarning(String firstRegionName, int sum, int rowSizeWarnThreshold); 1178 } 1179 1180 private static LogDelegate DEFAULT_LOG_DELEGATE = new LogDelegate() { 1181 @Override 1182 public void logBatchWarning(String firstRegionName, int sum, int rowSizeWarnThreshold) { 1183 if (LOG.isWarnEnabled()) { 1184 LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold 1185 + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: " 1186 + RpcServer.getRequestUserName().orElse(null) + "/" 1187 + RpcServer.getRemoteAddress().orElse(null) 1188 + " first region in multi=" + firstRegionName); 1189 } 1190 } 1191 }; 1192 1193 private final LogDelegate ld; 1194 1195 public RSRpcServices(HRegionServer rs) throws IOException { 1196 this(rs, DEFAULT_LOG_DELEGATE); 1197 } 1198 1199 // Directly invoked only for testing 1200 RSRpcServices(HRegionServer rs, LogDelegate ld) throws IOException { 1201 this.ld = ld; 1202 regionServer = rs; 1203 rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT); 1204 RpcSchedulerFactory rpcSchedulerFactory; 1205 try { 1206 rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class) 1207 .getDeclaredConstructor().newInstance(); 1208 } catch (NoSuchMethodException | InvocationTargetException | 1209 InstantiationException | IllegalAccessException e) { 1210 throw new IllegalArgumentException(e); 1211 } 1212 // Server to handle client requests. 1213 InetSocketAddress initialIsa; 1214 InetSocketAddress bindAddress; 1215 if(this instanceof MasterRpcServices) { 1216 String hostname = getHostname(rs.conf, true); 1217 int port = rs.conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT); 1218 // Creation of a HSA will force a resolve. 1219 initialIsa = new InetSocketAddress(hostname, port); 1220 bindAddress = new InetSocketAddress(rs.conf.get("hbase.master.ipc.address", hostname), port); 1221 } else { 1222 String hostname = getHostname(rs.conf, false); 1223 int port = rs.conf.getInt(HConstants.REGIONSERVER_PORT, 1224 HConstants.DEFAULT_REGIONSERVER_PORT); 1225 // Creation of a HSA will force a resolve. 1226 initialIsa = new InetSocketAddress(hostname, port); 1227 bindAddress = new InetSocketAddress( 1228 rs.conf.get("hbase.regionserver.ipc.address", hostname), port); 1229 } 1230 if (initialIsa.getAddress() == null) { 1231 throw new IllegalArgumentException("Failed resolve of " + initialIsa); 1232 } 1233 priority = createPriority(); 1234 // Using Address means we don't get the IP too. Shorten it more even to just the host name 1235 // w/o the domain. 1236 String name = rs.getProcessName() + "/" + 1237 Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain(); 1238 // Set how many times to retry talking to another server over Connection. 1239 ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG); 1240 rpcServer = createRpcServer(rs, rs.conf, rpcSchedulerFactory, bindAddress, name); 1241 rpcServer.setRsRpcServices(this); 1242 scannerLeaseTimeoutPeriod = rs.conf.getInt( 1243 HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1244 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 1245 maxScannerResultSize = rs.conf.getLong( 1246 HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 1247 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); 1248 rpcTimeout = rs.conf.getInt( 1249 HConstants.HBASE_RPC_TIMEOUT_KEY, 1250 HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 1251 minimumScanTimeLimitDelta = rs.conf.getLong( 1252 REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 1253 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); 1254 1255 InetSocketAddress address = rpcServer.getListenerAddress(); 1256 if (address == null) { 1257 throw new IOException("Listener channel is closed"); 1258 } 1259 // Set our address, however we need the final port that was given to rpcServer 1260 isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort()); 1261 rpcServer.setErrorHandler(this); 1262 rs.setName(name); 1263 1264 closedScanners = CacheBuilder.newBuilder() 1265 .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build(); 1266 } 1267 1268 protected RpcServerInterface createRpcServer(Server server, Configuration conf, 1269 RpcSchedulerFactory rpcSchedulerFactory, InetSocketAddress bindAddress, String name) 1270 throws IOException { 1271 boolean reservoirEnabled = conf.getBoolean(RESERVOIR_ENABLED_KEY, true); 1272 try { 1273 return RpcServerFactory.createRpcServer(server, name, getServices(), 1274 bindAddress, // use final bindAddress for this server. 1275 conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled); 1276 } catch (BindException be) { 1277 throw new IOException(be.getMessage() + ". To switch ports use the '" 1278 + HConstants.REGIONSERVER_PORT + "' configuration property.", 1279 be.getCause() != null ? be.getCause() : be); 1280 } 1281 } 1282 1283 protected Class<?> getRpcSchedulerFactoryClass() { 1284 return this.regionServer.conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1285 SimpleRpcSchedulerFactory.class); 1286 } 1287 1288 @Override 1289 public void onConfigurationChange(Configuration newConf) { 1290 if (rpcServer instanceof ConfigurationObserver) { 1291 ((ConfigurationObserver)rpcServer).onConfigurationChange(newConf); 1292 } 1293 } 1294 1295 protected PriorityFunction createPriority() { 1296 return new AnnotationReadingPriorityFunction(this); 1297 } 1298 1299 protected void requirePermission(String request, Permission.Action perm) throws IOException { 1300 if (accessChecker != null) { 1301 accessChecker.requirePermission(RpcServer.getRequestUser().orElse(null), request, perm); 1302 } 1303 } 1304 1305 1306 public static String getHostname(Configuration conf, boolean isMaster) 1307 throws UnknownHostException { 1308 String hostname = conf.get(isMaster? HRegionServer.MASTER_HOSTNAME_KEY : 1309 HRegionServer.RS_HOSTNAME_KEY); 1310 if (hostname == null || hostname.isEmpty()) { 1311 String masterOrRS = isMaster ? "master" : "regionserver"; 1312 return Strings.domainNamePointerToHostName(DNS.getDefaultHost( 1313 conf.get("hbase." + masterOrRS + ".dns.interface", "default"), 1314 conf.get("hbase." + masterOrRS + ".dns.nameserver", "default"))); 1315 } else { 1316 LOG.info("hostname is configured to be " + hostname); 1317 return hostname; 1318 } 1319 } 1320 1321 @VisibleForTesting 1322 public int getScannersCount() { 1323 return scanners.size(); 1324 } 1325 1326 public 1327 RegionScanner getScanner(long scannerId) { 1328 String scannerIdString = Long.toString(scannerId); 1329 RegionScannerHolder scannerHolder = scanners.get(scannerIdString); 1330 if (scannerHolder != null) { 1331 return scannerHolder.s; 1332 } 1333 return null; 1334 } 1335 1336 public String getScanDetailsWithId(long scannerId) { 1337 RegionScanner scanner = getScanner(scannerId); 1338 if (scanner == null) { 1339 return null; 1340 } 1341 StringBuilder builder = new StringBuilder(); 1342 builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString()); 1343 builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString()); 1344 return builder.toString(); 1345 } 1346 1347 /** 1348 * Get the vtime associated with the scanner. 1349 * Currently the vtime is the number of "next" calls. 1350 */ 1351 long getScannerVirtualTime(long scannerId) { 1352 String scannerIdString = Long.toString(scannerId); 1353 RegionScannerHolder scannerHolder = scanners.get(scannerIdString); 1354 if (scannerHolder != null) { 1355 return scannerHolder.getNextCallSeq(); 1356 } 1357 return 0L; 1358 } 1359 1360 /** 1361 * Method to account for the size of retained cells and retained data blocks. 1362 * @return an object that represents the last referenced block from this response. 1363 */ 1364 Object addSize(RpcCallContext context, Result r, Object lastBlock) { 1365 if (context != null && r != null && !r.isEmpty()) { 1366 for (Cell c : r.rawCells()) { 1367 context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c)); 1368 1369 // Since byte buffers can point all kinds of crazy places it's harder to keep track 1370 // of which blocks are kept alive by what byte buffer. 1371 // So we make a guess. 1372 if (c instanceof ByteBufferExtendedCell) { 1373 ByteBufferExtendedCell bbCell = (ByteBufferExtendedCell) c; 1374 ByteBuffer bb = bbCell.getValueByteBuffer(); 1375 if (bb != lastBlock) { 1376 context.incrementResponseBlockSize(bb.capacity()); 1377 lastBlock = bb; 1378 } 1379 } else { 1380 // We're using the last block being the same as the current block as 1381 // a proxy for pointing to a new block. This won't be exact. 1382 // If there are multiple gets that bounce back and forth 1383 // Then it's possible that this will over count the size of 1384 // referenced blocks. However it's better to over count and 1385 // use two rpcs than to OOME the regionserver. 1386 byte[] valueArray = c.getValueArray(); 1387 if (valueArray != lastBlock) { 1388 context.incrementResponseBlockSize(valueArray.length); 1389 lastBlock = valueArray; 1390 } 1391 } 1392 1393 } 1394 } 1395 return lastBlock; 1396 } 1397 1398 private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper, 1399 HRegion r, boolean needCursor) throws LeaseStillHeldException { 1400 Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod, 1401 new ScannerListener(scannerName)); 1402 RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease); 1403 RpcCallback closeCallback; 1404 if (s instanceof RpcCallback) { 1405 closeCallback = (RpcCallback) s; 1406 } else { 1407 closeCallback = new RegionScannerCloseCallBack(s); 1408 } 1409 RegionScannerHolder rsh = 1410 new RegionScannerHolder(scannerName, s, r, closeCallback, shippedCallback, needCursor); 1411 RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); 1412 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " + 1413 scannerName; 1414 return rsh; 1415 } 1416 1417 /** 1418 * Find the HRegion based on a region specifier 1419 * 1420 * @param regionSpecifier the region specifier 1421 * @return the corresponding region 1422 * @throws IOException if the specifier is not null, 1423 * but failed to find the region 1424 */ 1425 @VisibleForTesting 1426 public HRegion getRegion( 1427 final RegionSpecifier regionSpecifier) throws IOException { 1428 return regionServer.getRegion(regionSpecifier.getValue().toByteArray()); 1429 } 1430 1431 /** 1432 * Find the List of HRegions based on a list of region specifiers 1433 * 1434 * @param regionSpecifiers the list of region specifiers 1435 * @return the corresponding list of regions 1436 * @throws IOException if any of the specifiers is not null, 1437 * but failed to find the region 1438 */ 1439 private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers, 1440 final CacheEvictionStatsBuilder stats) { 1441 List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size()); 1442 for (RegionSpecifier regionSpecifier: regionSpecifiers) { 1443 try { 1444 regions.add(regionServer.getRegion(regionSpecifier.getValue().toByteArray())); 1445 } catch (NotServingRegionException e) { 1446 stats.addException(regionSpecifier.getValue().toByteArray(), e); 1447 } 1448 } 1449 return regions; 1450 } 1451 1452 @VisibleForTesting 1453 public PriorityFunction getPriority() { 1454 return priority; 1455 } 1456 1457 @VisibleForTesting 1458 public Configuration getConfiguration() { 1459 return regionServer.getConfiguration(); 1460 } 1461 1462 private RegionServerRpcQuotaManager getRpcQuotaManager() { 1463 return regionServer.getRegionServerRpcQuotaManager(); 1464 } 1465 1466 private RegionServerSpaceQuotaManager getSpaceQuotaManager() { 1467 return regionServer.getRegionServerSpaceQuotaManager(); 1468 } 1469 1470 void start(ZKWatcher zkWatcher) { 1471 if (AccessChecker.isAuthorizationSupported(getConfiguration())) { 1472 accessChecker = new AccessChecker(getConfiguration(), zkWatcher); 1473 } 1474 this.scannerIdGenerator = new ScannerIdGenerator(this.regionServer.serverName); 1475 rpcServer.start(); 1476 } 1477 1478 void stop() { 1479 if (accessChecker != null) { 1480 accessChecker.stop(); 1481 } 1482 closeAllScanners(); 1483 rpcServer.stop(); 1484 } 1485 1486 /** 1487 * Called to verify that this server is up and running. 1488 */ 1489 // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name). 1490 protected void checkOpen() throws IOException { 1491 if (regionServer.isAborted()) { 1492 throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting"); 1493 } 1494 if (regionServer.isStopped()) { 1495 throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping"); 1496 } 1497 if (!regionServer.fsOk) { 1498 throw new RegionServerStoppedException("File system not available"); 1499 } 1500 if (!regionServer.isOnline()) { 1501 throw new ServerNotRunningYetException("Server " + regionServer.serverName 1502 + " is not running yet"); 1503 } 1504 } 1505 1506 /** 1507 * By default, put up an Admin and a Client Service. 1508 * Set booleans <code>hbase.regionserver.admin.executorService</code> and 1509 * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services. 1510 * Default is that both are enabled. 1511 * @return immutable list of blocking services and the security info classes that this server 1512 * supports 1513 */ 1514 protected List<BlockingServiceAndInterface> getServices() { 1515 boolean admin = 1516 getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true); 1517 boolean client = 1518 getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true); 1519 List<BlockingServiceAndInterface> bssi = new ArrayList<>(); 1520 if (client) { 1521 bssi.add(new BlockingServiceAndInterface( 1522 ClientService.newReflectiveBlockingService(this), 1523 ClientService.BlockingInterface.class)); 1524 } 1525 if (admin) { 1526 bssi.add(new BlockingServiceAndInterface( 1527 AdminService.newReflectiveBlockingService(this), 1528 AdminService.BlockingInterface.class)); 1529 } 1530 return new org.apache.hbase.thirdparty.com.google.common.collect. 1531 ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build(); 1532 } 1533 1534 public InetSocketAddress getSocketAddress() { 1535 return isa; 1536 } 1537 1538 @Override 1539 public int getPriority(RequestHeader header, Message param, User user) { 1540 return priority.getPriority(header, param, user); 1541 } 1542 1543 @Override 1544 public long getDeadline(RequestHeader header, Message param) { 1545 return priority.getDeadline(header, param); 1546 } 1547 1548 /* 1549 * Check if an OOME and, if so, abort immediately to avoid creating more objects. 1550 * 1551 * @param e 1552 * 1553 * @return True if we OOME'd and are aborting. 1554 */ 1555 @Override 1556 public boolean checkOOME(final Throwable e) { 1557 return exitIfOOME(e); 1558 } 1559 1560 public static boolean exitIfOOME(final Throwable e ){ 1561 boolean stop = false; 1562 try { 1563 if (e instanceof OutOfMemoryError 1564 || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError) 1565 || (e.getMessage() != null && e.getMessage().contains( 1566 "java.lang.OutOfMemoryError"))) { 1567 stop = true; 1568 LOG.error(HBaseMarkers.FATAL, "Run out of memory; " 1569 + RSRpcServices.class.getSimpleName() + " will abort itself immediately", 1570 e); 1571 } 1572 } finally { 1573 if (stop) { 1574 Runtime.getRuntime().halt(1); 1575 } 1576 } 1577 return stop; 1578 } 1579 1580 /** 1581 * Close a region on the region server. 1582 * 1583 * @param controller the RPC controller 1584 * @param request the request 1585 * @throws ServiceException 1586 */ 1587 @Override 1588 @QosPriority(priority=HConstants.ADMIN_QOS) 1589 public CloseRegionResponse closeRegion(final RpcController controller, 1590 final CloseRegionRequest request) throws ServiceException { 1591 final ServerName sn = (request.hasDestinationServer() ? 1592 ProtobufUtil.toServerName(request.getDestinationServer()) : null); 1593 1594 try { 1595 checkOpen(); 1596 if (request.hasServerStartCode()) { 1597 // check that we are the same server that this RPC is intended for. 1598 long serverStartCode = request.getServerStartCode(); 1599 if (regionServer.serverName.getStartcode() != serverStartCode) { 1600 throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " + 1601 "different server with startCode: " + serverStartCode + ", this server is: " 1602 + regionServer.serverName)); 1603 } 1604 } 1605 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 1606 1607 requestCount.increment(); 1608 if (sn == null) { 1609 LOG.info("Close " + encodedRegionName + " without moving"); 1610 } else { 1611 LOG.info("Close " + encodedRegionName + ", moving to " + sn); 1612 } 1613 boolean closed = regionServer.closeRegion(encodedRegionName, false, sn); 1614 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed); 1615 return builder.build(); 1616 } catch (IOException ie) { 1617 throw new ServiceException(ie); 1618 } 1619 } 1620 1621 /** 1622 * Compact a region on the region server. 1623 * 1624 * @param controller the RPC controller 1625 * @param request the request 1626 * @throws ServiceException 1627 */ 1628 @Override 1629 @QosPriority(priority = HConstants.ADMIN_QOS) 1630 public CompactRegionResponse compactRegion(final RpcController controller, 1631 final CompactRegionRequest request) throws ServiceException { 1632 try { 1633 checkOpen(); 1634 requestCount.increment(); 1635 HRegion region = getRegion(request.getRegion()); 1636 // Quota support is enabled, the requesting user is not system/super user 1637 // and a quota policy is enforced that disables compactions. 1638 if (QuotaUtil.isQuotaEnabled(getConfiguration()) && 1639 !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null)) && 1640 this.regionServer.getRegionServerSpaceQuotaManager() 1641 .areCompactionsDisabled(region.getTableDescriptor().getTableName())) { 1642 throw new DoNotRetryIOException( 1643 "Compactions on this region are " + "disabled due to a space quota violation."); 1644 } 1645 region.startRegionOperation(Operation.COMPACT_REGION); 1646 LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString()); 1647 boolean major = request.hasMajor() && request.getMajor(); 1648 if (request.hasFamily()) { 1649 byte[] family = request.getFamily().toByteArray(); 1650 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " + 1651 region.getRegionInfo().getRegionNameAsString() + " and family " + 1652 Bytes.toString(family); 1653 LOG.trace(log); 1654 region.requestCompaction(family, log, Store.PRIORITY_USER, major, 1655 CompactionLifeCycleTracker.DUMMY); 1656 } else { 1657 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " + 1658 region.getRegionInfo().getRegionNameAsString(); 1659 LOG.trace(log); 1660 region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY); 1661 } 1662 return CompactRegionResponse.newBuilder().build(); 1663 } catch (IOException ie) { 1664 throw new ServiceException(ie); 1665 } 1666 } 1667 1668 /** 1669 * Flush a region on the region server. 1670 * 1671 * @param controller the RPC controller 1672 * @param request the request 1673 * @throws ServiceException 1674 */ 1675 @Override 1676 @QosPriority(priority=HConstants.ADMIN_QOS) 1677 public FlushRegionResponse flushRegion(final RpcController controller, 1678 final FlushRegionRequest request) throws ServiceException { 1679 try { 1680 checkOpen(); 1681 requestCount.increment(); 1682 HRegion region = getRegion(request.getRegion()); 1683 LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString()); 1684 boolean shouldFlush = true; 1685 if (request.hasIfOlderThanTs()) { 1686 shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs(); 1687 } 1688 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); 1689 if (shouldFlush) { 1690 boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ? 1691 request.getWriteFlushWalMarker() : false; 1692 // Go behind the curtain so we can manage writing of the flush WAL marker 1693 HRegion.FlushResultImpl flushResult = 1694 region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1695 boolean compactionNeeded = flushResult.isCompactionNeeded(); 1696 if (compactionNeeded) { 1697 regionServer.compactSplitThread.requestSystemCompaction(region, 1698 "Compaction through user triggered flush"); 1699 } 1700 builder.setFlushed(flushResult.isFlushSucceeded()); 1701 builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker); 1702 } 1703 builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores()); 1704 return builder.build(); 1705 } catch (DroppedSnapshotException ex) { 1706 // Cache flush can fail in a few places. If it fails in a critical 1707 // section, we get a DroppedSnapshotException and a replay of wal 1708 // is required. Currently the only way to do this is a restart of 1709 // the server. 1710 regionServer.abort("Replay of WAL required. Forcing server shutdown", ex); 1711 throw new ServiceException(ex); 1712 } catch (IOException ie) { 1713 throw new ServiceException(ie); 1714 } 1715 } 1716 1717 @Override 1718 @QosPriority(priority=HConstants.ADMIN_QOS) 1719 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller, 1720 final GetOnlineRegionRequest request) throws ServiceException { 1721 try { 1722 checkOpen(); 1723 requestCount.increment(); 1724 Map<String, HRegion> onlineRegions = regionServer.onlineRegions; 1725 List<RegionInfo> list = new ArrayList<>(onlineRegions.size()); 1726 for (HRegion region: onlineRegions.values()) { 1727 list.add(region.getRegionInfo()); 1728 } 1729 Collections.sort(list, RegionInfo.COMPARATOR); 1730 return ResponseConverter.buildGetOnlineRegionResponse(list); 1731 } catch (IOException ie) { 1732 throw new ServiceException(ie); 1733 } 1734 } 1735 1736 // Master implementation of this Admin Service differs given it is not 1737 // able to supply detail only known to RegionServer. See note on 1738 // MasterRpcServers#getRegionInfo. 1739 @Override 1740 @QosPriority(priority=HConstants.ADMIN_QOS) 1741 public GetRegionInfoResponse getRegionInfo(final RpcController controller, 1742 final GetRegionInfoRequest request) throws ServiceException { 1743 try { 1744 checkOpen(); 1745 requestCount.increment(); 1746 HRegion region = getRegion(request.getRegion()); 1747 RegionInfo info = region.getRegionInfo(); 1748 byte[] bestSplitRow = null; 1749 boolean shouldSplit = true; 1750 if (request.hasBestSplitRow() && request.getBestSplitRow()) { 1751 HRegion r = region; 1752 region.startRegionOperation(Operation.SPLIT_REGION); 1753 r.forceSplit(null); 1754 // Even after setting force split if split policy says no to split then we should not split. 1755 shouldSplit = region.getSplitPolicy().shouldSplit() && !info.isMetaRegion(); 1756 bestSplitRow = r.checkSplit(); 1757 // when all table data are in memstore, bestSplitRow = null 1758 // try to flush region first 1759 if(bestSplitRow == null) { 1760 r.flush(true); 1761 bestSplitRow = r.checkSplit(); 1762 } 1763 r.clearSplit(); 1764 } 1765 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); 1766 builder.setRegionInfo(ProtobufUtil.toRegionInfo(info)); 1767 if (request.hasCompactionState() && request.getCompactionState()) { 1768 builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState())); 1769 } 1770 builder.setSplittable(region.isSplittable() && shouldSplit); 1771 builder.setMergeable(region.isMergeable()); 1772 if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) { 1773 builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow)); 1774 } 1775 return builder.build(); 1776 } catch (IOException ie) { 1777 throw new ServiceException(ie); 1778 } 1779 } 1780 1781 @Override 1782 @QosPriority(priority=HConstants.ADMIN_QOS) 1783 public GetRegionLoadResponse getRegionLoad(RpcController controller, 1784 GetRegionLoadRequest request) throws ServiceException { 1785 1786 List<HRegion> regions; 1787 if (request.hasTableName()) { 1788 TableName tableName = ProtobufUtil.toTableName(request.getTableName()); 1789 regions = regionServer.getRegions(tableName); 1790 } else { 1791 regions = regionServer.getRegions(); 1792 } 1793 List<RegionLoad> rLoads = new ArrayList<>(regions.size()); 1794 RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder(); 1795 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1796 1797 try { 1798 for (HRegion region : regions) { 1799 rLoads.add(regionServer.createRegionLoad(region, regionLoadBuilder, regionSpecifier)); 1800 } 1801 } catch (IOException e) { 1802 throw new ServiceException(e); 1803 } 1804 GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder(); 1805 builder.addAllRegionLoads(rLoads); 1806 return builder.build(); 1807 } 1808 1809 @Override 1810 @QosPriority(priority=HConstants.ADMIN_QOS) 1811 public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, 1812 ClearCompactionQueuesRequest request) throws ServiceException { 1813 LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/" 1814 + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue"); 1815 ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder(); 1816 requestCount.increment(); 1817 if (clearCompactionQueues.compareAndSet(false,true)) { 1818 try { 1819 checkOpen(); 1820 regionServer.getRegionServerCoprocessorHost().preClearCompactionQueues(); 1821 for (String queueName : request.getQueueNameList()) { 1822 LOG.debug("clear " + queueName + " compaction queue"); 1823 switch (queueName) { 1824 case "long": 1825 regionServer.compactSplitThread.clearLongCompactionsQueue(); 1826 break; 1827 case "short": 1828 regionServer.compactSplitThread.clearShortCompactionsQueue(); 1829 break; 1830 default: 1831 LOG.warn("Unknown queue name " + queueName); 1832 throw new IOException("Unknown queue name " + queueName); 1833 } 1834 } 1835 regionServer.getRegionServerCoprocessorHost().postClearCompactionQueues(); 1836 } catch (IOException ie) { 1837 throw new ServiceException(ie); 1838 } finally { 1839 clearCompactionQueues.set(false); 1840 } 1841 } else { 1842 LOG.warn("Clear compactions queue is executing by other admin."); 1843 } 1844 return respBuilder.build(); 1845 } 1846 1847 /** 1848 * Get some information of the region server. 1849 * 1850 * @param controller the RPC controller 1851 * @param request the request 1852 * @throws ServiceException 1853 */ 1854 @Override 1855 @QosPriority(priority=HConstants.ADMIN_QOS) 1856 public GetServerInfoResponse getServerInfo(final RpcController controller, 1857 final GetServerInfoRequest request) throws ServiceException { 1858 try { 1859 checkOpen(); 1860 } catch (IOException ie) { 1861 throw new ServiceException(ie); 1862 } 1863 requestCount.increment(); 1864 int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1; 1865 return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort); 1866 } 1867 1868 @Override 1869 @QosPriority(priority=HConstants.ADMIN_QOS) 1870 public GetStoreFileResponse getStoreFile(final RpcController controller, 1871 final GetStoreFileRequest request) throws ServiceException { 1872 try { 1873 checkOpen(); 1874 HRegion region = getRegion(request.getRegion()); 1875 requestCount.increment(); 1876 Set<byte[]> columnFamilies; 1877 if (request.getFamilyCount() == 0) { 1878 columnFamilies = region.getTableDescriptor().getColumnFamilyNames(); 1879 } else { 1880 columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR); 1881 for (ByteString cf: request.getFamilyList()) { 1882 columnFamilies.add(cf.toByteArray()); 1883 } 1884 } 1885 int nCF = columnFamilies.size(); 1886 List<String> fileList = region.getStoreFileList( 1887 columnFamilies.toArray(new byte[nCF][])); 1888 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder(); 1889 builder.addAllStoreFile(fileList); 1890 return builder.build(); 1891 } catch (IOException ie) { 1892 throw new ServiceException(ie); 1893 } 1894 } 1895 1896 /** 1897 * Open asynchronously a region or a set of regions on the region server. 1898 * 1899 * The opening is coordinated by ZooKeeper, and this method requires the znode to be created 1900 * before being called. As a consequence, this method should be called only from the master. 1901 * <p> 1902 * Different manages states for the region are: 1903 * </p><ul> 1904 * <li>region not opened: the region opening will start asynchronously.</li> 1905 * <li>a close is already in progress: this is considered as an error.</li> 1906 * <li>an open is already in progress: this new open request will be ignored. This is important 1907 * because the Master can do multiple requests if it crashes.</li> 1908 * <li>the region is already opened: this new open request will be ignored.</li> 1909 * </ul> 1910 * <p> 1911 * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign. 1912 * For a single region opening, errors are sent through a ServiceException. For bulk assign, 1913 * errors are put in the response as FAILED_OPENING. 1914 * </p> 1915 * @param controller the RPC controller 1916 * @param request the request 1917 * @throws ServiceException 1918 */ 1919 @Override 1920 @QosPriority(priority=HConstants.ADMIN_QOS) 1921 public OpenRegionResponse openRegion(final RpcController controller, 1922 final OpenRegionRequest request) throws ServiceException { 1923 requestCount.increment(); 1924 if (request.hasServerStartCode()) { 1925 // check that we are the same server that this RPC is intended for. 1926 long serverStartCode = request.getServerStartCode(); 1927 if (regionServer.serverName.getStartcode() != serverStartCode) { 1928 throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " + 1929 "different server with startCode: " + serverStartCode + ", this server is: " 1930 + regionServer.serverName)); 1931 } 1932 } 1933 1934 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder(); 1935 final int regionCount = request.getOpenInfoCount(); 1936 final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount); 1937 final boolean isBulkAssign = regionCount > 1; 1938 try { 1939 checkOpen(); 1940 } catch (IOException ie) { 1941 TableName tableName = null; 1942 if (regionCount == 1) { 1943 org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri = request.getOpenInfo(0).getRegion(); 1944 if (ri != null) { 1945 tableName = ProtobufUtil.toTableName(ri.getTableName()); 1946 } 1947 } 1948 if (!TableName.META_TABLE_NAME.equals(tableName)) { 1949 throw new ServiceException(ie); 1950 } 1951 // We are assigning meta, wait a little for regionserver to finish initialization. 1952 int timeout = regionServer.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1953 HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; // Quarter of RPC timeout 1954 long endTime = System.currentTimeMillis() + timeout; 1955 synchronized (regionServer.online) { 1956 try { 1957 while (System.currentTimeMillis() <= endTime 1958 && !regionServer.isStopped() && !regionServer.isOnline()) { 1959 regionServer.online.wait(regionServer.msgInterval); 1960 } 1961 checkOpen(); 1962 } catch (InterruptedException t) { 1963 Thread.currentThread().interrupt(); 1964 throw new ServiceException(t); 1965 } catch (IOException e) { 1966 throw new ServiceException(e); 1967 } 1968 } 1969 } 1970 1971 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 1972 1973 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 1974 final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 1975 TableDescriptor htd; 1976 try { 1977 String encodedName = region.getEncodedName(); 1978 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 1979 final HRegion onlineRegion = regionServer.getRegion(encodedName); 1980 if (onlineRegion != null) { 1981 // The region is already online. This should not happen any more. 1982 String error = "Received OPEN for the region:" 1983 + region.getRegionNameAsString() + ", which is already online"; 1984 LOG.warn(error); 1985 //regionServer.abort(error); 1986 //throw new IOException(error); 1987 builder.addOpeningState(RegionOpeningState.OPENED); 1988 continue; 1989 } 1990 LOG.info("Open " + region.getRegionNameAsString()); 1991 1992 final Boolean previous = regionServer.regionsInTransitionInRS.putIfAbsent( 1993 encodedNameBytes, Boolean.TRUE); 1994 1995 if (Boolean.FALSE.equals(previous)) { 1996 if (regionServer.getRegion(encodedName) != null) { 1997 // There is a close in progress. This should not happen any more. 1998 String error = "Received OPEN for the region:" 1999 + region.getRegionNameAsString() + ", which we are already trying to CLOSE"; 2000 regionServer.abort(error); 2001 throw new IOException(error); 2002 } 2003 regionServer.regionsInTransitionInRS.put(encodedNameBytes, Boolean.TRUE); 2004 } 2005 2006 if (Boolean.TRUE.equals(previous)) { 2007 // An open is in progress. This is supported, but let's log this. 2008 LOG.info("Receiving OPEN for the region:" + 2009 region.getRegionNameAsString() + ", which we are already trying to OPEN" 2010 + " - ignoring this new request for this region."); 2011 } 2012 2013 // We are opening this region. If it moves back and forth for whatever reason, we don't 2014 // want to keep returning the stale moved record while we are opening/if we close again. 2015 regionServer.removeFromMovedRegions(region.getEncodedName()); 2016 2017 if (previous == null || !previous.booleanValue()) { 2018 htd = htds.get(region.getTable()); 2019 if (htd == null) { 2020 htd = regionServer.tableDescriptors.get(region.getTable()); 2021 htds.put(region.getTable(), htd); 2022 } 2023 if (htd == null) { 2024 throw new IOException("Missing table descriptor for " + region.getEncodedName()); 2025 } 2026 // If there is no action in progress, we can submit a specific handler. 2027 // Need to pass the expected version in the constructor. 2028 if (regionServer.executorService == null) { 2029 LOG.info("No executor executorService; skipping open request"); 2030 } else { 2031 if (region.isMetaRegion()) { 2032 regionServer.executorService.submit(new OpenMetaHandler( 2033 regionServer, regionServer, region, htd, masterSystemTime)); 2034 } else { 2035 if (regionOpenInfo.getFavoredNodesCount() > 0) { 2036 regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(), 2037 regionOpenInfo.getFavoredNodesList()); 2038 } 2039 if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) { 2040 regionServer.executorService.submit(new OpenPriorityRegionHandler( 2041 regionServer, regionServer, region, htd, masterSystemTime)); 2042 } else { 2043 regionServer.executorService.submit(new OpenRegionHandler( 2044 regionServer, regionServer, region, htd, masterSystemTime)); 2045 } 2046 } 2047 } 2048 } 2049 2050 builder.addOpeningState(RegionOpeningState.OPENED); 2051 } catch (IOException ie) { 2052 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie); 2053 if (isBulkAssign) { 2054 builder.addOpeningState(RegionOpeningState.FAILED_OPENING); 2055 } else { 2056 throw new ServiceException(ie); 2057 } 2058 } 2059 } 2060 return builder.build(); 2061 } 2062 2063 /** 2064 * Wamrmup a region on this server. 2065 * 2066 * This method should only be called by Master. It synchrnously opens the region and 2067 * closes the region bringing the most important pages in cache. 2068 * <p> 2069 * 2070 * @param controller the RPC controller 2071 * @param request the request 2072 * @throws ServiceException 2073 */ 2074 @Override 2075 public WarmupRegionResponse warmupRegion(final RpcController controller, 2076 final WarmupRegionRequest request) throws ServiceException { 2077 2078 final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo()); 2079 TableDescriptor htd; 2080 WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance(); 2081 2082 try { 2083 checkOpen(); 2084 String encodedName = region.getEncodedName(); 2085 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 2086 final HRegion onlineRegion = regionServer.getRegion(encodedName); 2087 2088 if (onlineRegion != null) { 2089 LOG.info("Region already online. Skipping warming up " + region); 2090 return response; 2091 } 2092 2093 if (LOG.isDebugEnabled()) { 2094 LOG.debug("Warming up Region " + region.getRegionNameAsString()); 2095 } 2096 2097 htd = regionServer.tableDescriptors.get(region.getTable()); 2098 2099 if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) { 2100 LOG.info("Region is in transition. Skipping warmup " + region); 2101 return response; 2102 } 2103 2104 HRegion.warmupHRegion(region, htd, regionServer.getWAL(region), 2105 regionServer.getConfiguration(), regionServer, null); 2106 2107 } catch (IOException ie) { 2108 LOG.error("Failed warming up region " + region.getRegionNameAsString(), ie); 2109 throw new ServiceException(ie); 2110 } 2111 2112 return response; 2113 } 2114 2115 /** 2116 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is 2117 * that the given mutations will be durable on the receiving RS if this method returns without any 2118 * exception. 2119 * @param controller the RPC controller 2120 * @param request the request 2121 * @throws ServiceException 2122 */ 2123 @Override 2124 @QosPriority(priority = HConstants.REPLAY_QOS) 2125 public ReplicateWALEntryResponse replay(final RpcController controller, 2126 final ReplicateWALEntryRequest request) throws ServiceException { 2127 long before = EnvironmentEdgeManager.currentTime(); 2128 CellScanner cells = ((HBaseRpcController) controller).cellScanner(); 2129 try { 2130 checkOpen(); 2131 List<WALEntry> entries = request.getEntryList(); 2132 if (entries == null || entries.isEmpty()) { 2133 // empty input 2134 return ReplicateWALEntryResponse.newBuilder().build(); 2135 } 2136 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2137 HRegion region = regionServer.getRegionByEncodedName(regionName.toStringUtf8()); 2138 RegionCoprocessorHost coprocessorHost = 2139 ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) 2140 ? region.getCoprocessorHost() 2141 : null; // do not invoke coprocessors if this is a secondary region replica 2142 List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>(); 2143 2144 // Skip adding the edits to WAL if this is a secondary region replica 2145 boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 2146 Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL; 2147 2148 for (WALEntry entry : entries) { 2149 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2150 throw new NotServingRegionException("Replay request contains entries from multiple " + 2151 "regions. First region:" + regionName.toStringUtf8() + " , other region:" 2152 + entry.getKey().getEncodedRegionName()); 2153 } 2154 if (regionServer.nonceManager != null && isPrimary) { 2155 long nonceGroup = entry.getKey().hasNonceGroup() 2156 ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 2157 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 2158 regionServer.nonceManager.reportOperationFromWal( 2159 nonceGroup, 2160 nonce, 2161 entry.getKey().getWriteTime()); 2162 } 2163 Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>(); 2164 List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry, 2165 cells, walEntry, durability); 2166 if (coprocessorHost != null) { 2167 // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a 2168 // KeyValue. 2169 if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), 2170 walEntry.getSecond())) { 2171 // if bypass this log entry, ignore it ... 2172 continue; 2173 } 2174 walEntries.add(walEntry); 2175 } 2176 if(edits!=null && !edits.isEmpty()) { 2177 // HBASE-17924 2178 // sort to improve lock efficiency 2179 Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation)); 2180 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? 2181 entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber(); 2182 OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); 2183 // check if it's a partial success 2184 for (int i = 0; result != null && i < result.length; i++) { 2185 if (result[i] != OperationStatus.SUCCESS) { 2186 throw new IOException(result[i].getExceptionMsg()); 2187 } 2188 } 2189 } 2190 } 2191 2192 //sync wal at the end because ASYNC_WAL is used above 2193 WAL wal = region.getWAL(); 2194 if (wal != null) { 2195 wal.sync(); 2196 } 2197 2198 if (coprocessorHost != null) { 2199 for (Pair<WALKey, WALEdit> entry : walEntries) { 2200 coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(), 2201 entry.getSecond()); 2202 } 2203 } 2204 return ReplicateWALEntryResponse.newBuilder().build(); 2205 } catch (IOException ie) { 2206 throw new ServiceException(ie); 2207 } finally { 2208 if (regionServer.metricsRegionServer != null) { 2209 regionServer.metricsRegionServer.updateReplay( 2210 EnvironmentEdgeManager.currentTime() - before); 2211 } 2212 } 2213 } 2214 2215 /** 2216 * Replicate WAL entries on the region server. 2217 * 2218 * @param controller the RPC controller 2219 * @param request the request 2220 * @throws ServiceException 2221 */ 2222 @Override 2223 @QosPriority(priority=HConstants.REPLICATION_QOS) 2224 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, 2225 final ReplicateWALEntryRequest request) throws ServiceException { 2226 try { 2227 checkOpen(); 2228 if (regionServer.replicationSinkHandler != null) { 2229 requestCount.increment(); 2230 List<WALEntry> entries = request.getEntryList(); 2231 CellScanner cellScanner = ((HBaseRpcController)controller).cellScanner(); 2232 regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(); 2233 regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner, 2234 request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), 2235 request.getSourceHFileArchiveDirPath()); 2236 regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(); 2237 return ReplicateWALEntryResponse.newBuilder().build(); 2238 } else { 2239 throw new ServiceException("Replication services are not initialized yet"); 2240 } 2241 } catch (IOException ie) { 2242 throw new ServiceException(ie); 2243 } 2244 } 2245 2246 /** 2247 * Roll the WAL writer of the region server. 2248 * @param controller the RPC controller 2249 * @param request the request 2250 * @throws ServiceException 2251 */ 2252 @Override 2253 public RollWALWriterResponse rollWALWriter(final RpcController controller, 2254 final RollWALWriterRequest request) throws ServiceException { 2255 try { 2256 checkOpen(); 2257 requestCount.increment(); 2258 regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest(); 2259 regionServer.walRoller.requestRollAll(); 2260 regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest(); 2261 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder(); 2262 return builder.build(); 2263 } catch (IOException ie) { 2264 throw new ServiceException(ie); 2265 } 2266 } 2267 2268 2269 /** 2270 * Stop the region server. 2271 * 2272 * @param controller the RPC controller 2273 * @param request the request 2274 * @throws ServiceException 2275 */ 2276 @Override 2277 @QosPriority(priority=HConstants.ADMIN_QOS) 2278 public StopServerResponse stopServer(final RpcController controller, 2279 final StopServerRequest request) throws ServiceException { 2280 requestCount.increment(); 2281 String reason = request.getReason(); 2282 regionServer.stop(reason); 2283 return StopServerResponse.newBuilder().build(); 2284 } 2285 2286 @Override 2287 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, 2288 UpdateFavoredNodesRequest request) throws ServiceException { 2289 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList(); 2290 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder(); 2291 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) { 2292 RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion()); 2293 if (regionUpdateInfo.getFavoredNodesCount() > 0) { 2294 regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(), 2295 regionUpdateInfo.getFavoredNodesList()); 2296 } 2297 } 2298 respBuilder.setResponse(openInfoList.size()); 2299 return respBuilder.build(); 2300 } 2301 2302 /** 2303 * Atomically bulk load several HFiles into an open region 2304 * @return true if successful, false is failed but recoverably (no action) 2305 * @throws ServiceException if failed unrecoverably 2306 */ 2307 @Override 2308 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, 2309 final BulkLoadHFileRequest request) throws ServiceException { 2310 long start = EnvironmentEdgeManager.currentTime(); 2311 List<String> clusterIds = new ArrayList<>(request.getClusterIdsList()); 2312 if(clusterIds.contains(this.regionServer.clusterId)){ 2313 return BulkLoadHFileResponse.newBuilder().setLoaded(true).build(); 2314 } else { 2315 clusterIds.add(this.regionServer.clusterId); 2316 } 2317 try { 2318 checkOpen(); 2319 requestCount.increment(); 2320 HRegion region = getRegion(request.getRegion()); 2321 Map<byte[], List<Path>> map = null; 2322 final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration()); 2323 long sizeToBeLoaded = -1; 2324 2325 // Check to see if this bulk load would exceed the space quota for this table 2326 if (spaceQuotaEnabled) { 2327 ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements(); 2328 SpaceViolationPolicyEnforcement enforcement = activeSpaceQuotas.getPolicyEnforcement( 2329 region); 2330 if (enforcement != null) { 2331 // Bulk loads must still be atomic. We must enact all or none. 2332 List<String> filePaths = new ArrayList<>(request.getFamilyPathCount()); 2333 for (FamilyPath familyPath : request.getFamilyPathList()) { 2334 filePaths.add(familyPath.getPath()); 2335 } 2336 // Check if the batch of files exceeds the current quota 2337 sizeToBeLoaded = enforcement.computeBulkLoadSize(regionServer.getFileSystem(), filePaths); 2338 } 2339 } 2340 2341 List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount()); 2342 for (FamilyPath familyPath : request.getFamilyPathList()) { 2343 familyPaths.add(new Pair<>(familyPath.getFamily().toByteArray(), familyPath.getPath())); 2344 } 2345 if (!request.hasBulkToken()) { 2346 if (region.getCoprocessorHost() != null) { 2347 region.getCoprocessorHost().preBulkLoadHFile(familyPaths); 2348 } 2349 try { 2350 map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, 2351 request.getCopyFile(), clusterIds, request.getReplicate()); 2352 } finally { 2353 if (region.getCoprocessorHost() != null) { 2354 region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map); 2355 } 2356 } 2357 } else { 2358 // secure bulk load 2359 map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request, clusterIds); 2360 } 2361 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); 2362 builder.setLoaded(map != null); 2363 if (map != null) { 2364 // Treat any negative size as a flag to "ignore" updating the region size as that is 2365 // not possible to occur in real life (cannot bulk load a file with negative size) 2366 if (spaceQuotaEnabled && sizeToBeLoaded > 0) { 2367 if (LOG.isTraceEnabled()) { 2368 LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by " 2369 + sizeToBeLoaded + " bytes"); 2370 } 2371 // Inform space quotas of the new files for this region 2372 getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize( 2373 region.getRegionInfo(), sizeToBeLoaded); 2374 } 2375 } 2376 return builder.build(); 2377 } catch (IOException ie) { 2378 throw new ServiceException(ie); 2379 } finally { 2380 if (regionServer.metricsRegionServer != null) { 2381 regionServer.metricsRegionServer.updateBulkLoad( 2382 EnvironmentEdgeManager.currentTime() - start); 2383 } 2384 } 2385 } 2386 2387 @Override 2388 public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, 2389 PrepareBulkLoadRequest request) throws ServiceException { 2390 try { 2391 checkOpen(); 2392 requestCount.increment(); 2393 2394 HRegion region = getRegion(request.getRegion()); 2395 2396 String bulkToken = regionServer.secureBulkLoadManager.prepareBulkLoad(region, request); 2397 PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder(); 2398 builder.setBulkToken(bulkToken); 2399 return builder.build(); 2400 } catch (IOException ie) { 2401 throw new ServiceException(ie); 2402 } 2403 } 2404 2405 @Override 2406 public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, 2407 CleanupBulkLoadRequest request) throws ServiceException { 2408 try { 2409 checkOpen(); 2410 requestCount.increment(); 2411 2412 HRegion region = getRegion(request.getRegion()); 2413 2414 regionServer.secureBulkLoadManager.cleanupBulkLoad(region, request); 2415 CleanupBulkLoadResponse response = CleanupBulkLoadResponse.newBuilder().build(); 2416 return response; 2417 } catch (IOException ie) { 2418 throw new ServiceException(ie); 2419 } 2420 } 2421 2422 @Override 2423 public CoprocessorServiceResponse execService(final RpcController controller, 2424 final CoprocessorServiceRequest request) throws ServiceException { 2425 try { 2426 checkOpen(); 2427 requestCount.increment(); 2428 HRegion region = getRegion(request.getRegion()); 2429 com.google.protobuf.Message result = execServiceOnRegion(region, request.getCall()); 2430 CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder(); 2431 builder.setRegion(RequestConverter.buildRegionSpecifier( 2432 RegionSpecifierType.REGION_NAME, region.getRegionInfo().getRegionName())); 2433 // TODO: COPIES!!!!!! 2434 builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()). 2435 setValue(org.apache.hbase.thirdparty.com.google.protobuf.ByteString. 2436 copyFrom(result.toByteArray()))); 2437 return builder.build(); 2438 } catch (IOException ie) { 2439 throw new ServiceException(ie); 2440 } 2441 } 2442 2443 private com.google.protobuf.Message execServiceOnRegion(HRegion region, 2444 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException { 2445 // ignore the passed in controller (from the serialized call) 2446 ServerRpcController execController = new ServerRpcController(); 2447 return region.execService(execController, serviceCall); 2448 } 2449 2450 /** 2451 * Get data from a table. 2452 * 2453 * @param controller the RPC controller 2454 * @param request the get request 2455 * @throws ServiceException 2456 */ 2457 @Override 2458 public GetResponse get(final RpcController controller, 2459 final GetRequest request) throws ServiceException { 2460 long before = EnvironmentEdgeManager.currentTime(); 2461 OperationQuota quota = null; 2462 HRegion region = null; 2463 try { 2464 checkOpen(); 2465 requestCount.increment(); 2466 rpcGetRequestCount.increment(); 2467 region = getRegion(request.getRegion()); 2468 2469 GetResponse.Builder builder = GetResponse.newBuilder(); 2470 ClientProtos.Get get = request.getGet(); 2471 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 2472 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 2473 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 2474 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 2475 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) { 2476 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " + 2477 "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " + 2478 "reverse Scan."); 2479 } 2480 Boolean existence = null; 2481 Result r = null; 2482 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2483 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET); 2484 2485 Get clientGet = ProtobufUtil.toGet(get); 2486 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) { 2487 existence = region.getCoprocessorHost().preExists(clientGet); 2488 } 2489 if (existence == null) { 2490 if (context != null) { 2491 r = get(clientGet, (region), null, context); 2492 } else { 2493 // for test purpose 2494 r = region.get(clientGet); 2495 } 2496 if (get.getExistenceOnly()) { 2497 boolean exists = r.getExists(); 2498 if (region.getCoprocessorHost() != null) { 2499 exists = region.getCoprocessorHost().postExists(clientGet, exists); 2500 } 2501 existence = exists; 2502 } 2503 } 2504 if (existence != null) { 2505 ClientProtos.Result pbr = 2506 ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0); 2507 builder.setResult(pbr); 2508 } else if (r != null) { 2509 ClientProtos.Result pbr; 2510 if (isClientCellBlockSupport(context) && controller instanceof HBaseRpcController 2511 && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3)) { 2512 pbr = ProtobufUtil.toResultNoData(r); 2513 ((HBaseRpcController) controller).setCellScanner(CellUtil.createCellScanner(r 2514 .rawCells())); 2515 addSize(context, r, null); 2516 } else { 2517 pbr = ProtobufUtil.toResult(r); 2518 } 2519 builder.setResult(pbr); 2520 } 2521 //r.cells is null when an table.exists(get) call 2522 if (r != null && r.rawCells() != null) { 2523 quota.addGetResult(r); 2524 } 2525 return builder.build(); 2526 } catch (IOException ie) { 2527 throw new ServiceException(ie); 2528 } finally { 2529 MetricsRegionServer mrs = regionServer.metricsRegionServer; 2530 if (mrs != null) { 2531 TableDescriptor td = region != null? region.getTableDescriptor(): null; 2532 if (td != null) { 2533 mrs.updateGet(td.getTableName(), EnvironmentEdgeManager.currentTime() - before); 2534 } 2535 } 2536 if (quota != null) { 2537 quota.close(); 2538 } 2539 } 2540 } 2541 2542 private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, 2543 RpcCallContext context) throws IOException { 2544 region.prepareGet(get); 2545 boolean stale = region.getRegionInfo().getReplicaId() != 0; 2546 2547 // This method is almost the same as HRegion#get. 2548 List<Cell> results = new ArrayList<>(); 2549 long before = EnvironmentEdgeManager.currentTime(); 2550 // pre-get CP hook 2551 if (region.getCoprocessorHost() != null) { 2552 if (region.getCoprocessorHost().preGet(get, results)) { 2553 region.metricsUpdateForGet(results, before); 2554 return Result 2555 .create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); 2556 } 2557 } 2558 Scan scan = new Scan(get); 2559 if (scan.getLoadColumnFamiliesOnDemandValue() == null) { 2560 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 2561 } 2562 RegionScannerImpl scanner = null; 2563 try { 2564 scanner = region.getScanner(scan); 2565 scanner.next(results); 2566 } finally { 2567 if (scanner != null) { 2568 if (closeCallBack == null) { 2569 // If there is a context then the scanner can be added to the current 2570 // RpcCallContext. The rpc callback will take care of closing the 2571 // scanner, for eg in case 2572 // of get() 2573 context.setCallBack(scanner); 2574 } else { 2575 // The call is from multi() where the results from the get() are 2576 // aggregated and then send out to the 2577 // rpc. The rpccall back will close all such scanners created as part 2578 // of multi(). 2579 closeCallBack.addScanner(scanner); 2580 } 2581 } 2582 } 2583 2584 // post-get CP hook 2585 if (region.getCoprocessorHost() != null) { 2586 region.getCoprocessorHost().postGet(get, results); 2587 } 2588 region.metricsUpdateForGet(results, before); 2589 2590 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); 2591 } 2592 2593 private void checkBatchSizeAndLogLargeSize(MultiRequest request) { 2594 int sum = 0; 2595 String firstRegionName = null; 2596 for (RegionAction regionAction : request.getRegionActionList()) { 2597 if (sum == 0) { 2598 firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray()); 2599 } 2600 sum += regionAction.getActionCount(); 2601 } 2602 if (sum > rowSizeWarnThreshold) { 2603 ld.logBatchWarning(firstRegionName, sum, rowSizeWarnThreshold); 2604 } 2605 } 2606 2607 /** 2608 * Execute multiple actions on a table: get, mutate, and/or execCoprocessor 2609 * 2610 * @param rpcc the RPC controller 2611 * @param request the multi request 2612 * @throws ServiceException 2613 */ 2614 @Override 2615 public MultiResponse multi(final RpcController rpcc, final MultiRequest request) 2616 throws ServiceException { 2617 try { 2618 checkOpen(); 2619 } catch (IOException ie) { 2620 throw new ServiceException(ie); 2621 } 2622 2623 checkBatchSizeAndLogLargeSize(request); 2624 2625 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2626 // It is also the conduit via which we pass back data. 2627 HBaseRpcController controller = (HBaseRpcController)rpcc; 2628 CellScanner cellScanner = controller != null ? controller.cellScanner(): null; 2629 if (controller != null) { 2630 controller.setCellScanner(null); 2631 } 2632 2633 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2634 2635 // this will contain all the cells that we need to return. It's created later, if needed. 2636 List<CellScannable> cellsToReturn = null; 2637 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder(); 2638 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); 2639 Boolean processed = null; 2640 RegionScannersCloseCallBack closeCallBack = null; 2641 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2642 this.rpcMultiRequestCount.increment(); 2643 this.requestCount.increment(); 2644 Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = new HashMap<>(request 2645 .getRegionActionCount()); 2646 ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); 2647 for (RegionAction regionAction : request.getRegionActionList()) { 2648 OperationQuota quota; 2649 HRegion region; 2650 regionActionResultBuilder.clear(); 2651 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2652 try { 2653 region = getRegion(regionSpecifier); 2654 quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList()); 2655 } catch (IOException e) { 2656 rpcServer.getMetrics().exception(e); 2657 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2658 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2659 // All Mutations in this RegionAction not executed as we can not see the Region online here 2660 // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner 2661 // corresponding to these Mutations. 2662 skipCellsForMutations(regionAction.getActionList(), cellScanner); 2663 continue; // For this region it's a failure. 2664 } 2665 2666 if (regionAction.hasAtomic() && regionAction.getAtomic()) { 2667 // How does this call happen? It may need some work to play well w/ the surroundings. 2668 // Need to return an item per Action along w/ Action index. TODO. 2669 try { 2670 if (request.hasCondition()) { 2671 Condition condition = request.getCondition(); 2672 byte[] row = condition.getRow().toByteArray(); 2673 byte[] family = condition.getFamily().toByteArray(); 2674 byte[] qualifier = condition.getQualifier().toByteArray(); 2675 CompareOperator op = 2676 CompareOperator.valueOf(condition.getCompareType().name()); 2677 ByteArrayComparable comparator = 2678 ProtobufUtil.toComparator(condition.getComparator()); 2679 TimeRange timeRange = condition.hasTimeRange() ? 2680 ProtobufUtil.toTimeRange(condition.getTimeRange()) : 2681 TimeRange.allTime(); 2682 processed = 2683 checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family, 2684 qualifier, op, comparator, timeRange, regionActionResultBuilder, 2685 spaceQuotaEnforcement); 2686 } else { 2687 doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), 2688 cellScanner, spaceQuotaEnforcement); 2689 processed = Boolean.TRUE; 2690 } 2691 } catch (IOException e) { 2692 rpcServer.getMetrics().exception(e); 2693 // As it's atomic, we may expect it's a global failure. 2694 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2695 } 2696 } else { 2697 // doNonAtomicRegionMutation manages the exception internally 2698 if (context != null && closeCallBack == null) { 2699 // An RpcCallBack that creates a list of scanners that needs to perform callBack 2700 // operation on completion of multiGets. 2701 // Set this only once 2702 closeCallBack = new RegionScannersCloseCallBack(); 2703 context.setCallBack(closeCallBack); 2704 } 2705 cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner, 2706 regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context, 2707 spaceQuotaEnforcement); 2708 } 2709 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2710 quota.close(); 2711 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2712 if(regionLoadStats != null) { 2713 regionStats.put(regionSpecifier, regionLoadStats); 2714 } 2715 } 2716 // Load the controller with the Cells to return. 2717 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { 2718 controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn)); 2719 } 2720 2721 if (processed != null) { 2722 responseBuilder.setProcessed(processed); 2723 } 2724 2725 MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder(); 2726 for(Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat: regionStats.entrySet()){ 2727 builder.addRegion(stat.getKey()); 2728 builder.addStat(stat.getValue()); 2729 } 2730 responseBuilder.setRegionStatistics(builder); 2731 return responseBuilder.build(); 2732 } 2733 2734 private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) { 2735 if (cellScanner == null) { 2736 return; 2737 } 2738 for (Action action : actions) { 2739 skipCellsForMutation(action, cellScanner); 2740 } 2741 } 2742 2743 private void skipCellsForMutation(Action action, CellScanner cellScanner) { 2744 if (cellScanner == null) { 2745 return; 2746 } 2747 try { 2748 if (action.hasMutation()) { 2749 MutationProto m = action.getMutation(); 2750 if (m.hasAssociatedCellCount()) { 2751 for (int i = 0; i < m.getAssociatedCellCount(); i++) { 2752 cellScanner.advance(); 2753 } 2754 } 2755 } 2756 } catch (IOException e) { 2757 // No need to handle these Individual Muatation level issue. Any way this entire RegionAction 2758 // marked as failed as we could not see the Region here. At client side the top level 2759 // RegionAction exception will be considered first. 2760 LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e); 2761 } 2762 } 2763 2764 /** 2765 * Mutate data in a table. 2766 * 2767 * @param rpcc the RPC controller 2768 * @param request the mutate request 2769 */ 2770 @Override 2771 public MutateResponse mutate(final RpcController rpcc, 2772 final MutateRequest request) throws ServiceException { 2773 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2774 // It is also the conduit via which we pass back data. 2775 HBaseRpcController controller = (HBaseRpcController)rpcc; 2776 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2777 OperationQuota quota = null; 2778 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2779 ActivePolicyEnforcement spaceQuotaEnforcement = null; 2780 MutationType type = null; 2781 HRegion region = null; 2782 long before = EnvironmentEdgeManager.currentTime(); 2783 // Clear scanner so we are not holding on to reference across call. 2784 if (controller != null) { 2785 controller.setCellScanner(null); 2786 } 2787 try { 2788 checkOpen(); 2789 requestCount.increment(); 2790 rpcMutateRequestCount.increment(); 2791 region = getRegion(request.getRegion()); 2792 MutateResponse.Builder builder = MutateResponse.newBuilder(); 2793 MutationProto mutation = request.getMutation(); 2794 if (!region.getRegionInfo().isMetaRegion()) { 2795 regionServer.cacheFlusher.reclaimMemStoreMemory(); 2796 } 2797 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2798 Result r = null; 2799 Boolean processed = null; 2800 type = mutation.getMutateType(); 2801 2802 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE); 2803 spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); 2804 2805 switch (type) { 2806 case APPEND: 2807 // TODO: this doesn't actually check anything. 2808 r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); 2809 break; 2810 case INCREMENT: 2811 // TODO: this doesn't actually check anything. 2812 r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); 2813 break; 2814 case PUT: 2815 Put put = ProtobufUtil.toPut(mutation, cellScanner); 2816 checkCellSizeLimit(region, put); 2817 // Throws an exception when violated 2818 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); 2819 quota.addMutation(put); 2820 if (request.hasCondition()) { 2821 Condition condition = request.getCondition(); 2822 byte[] row = condition.getRow().toByteArray(); 2823 byte[] family = condition.getFamily().toByteArray(); 2824 byte[] qualifier = condition.getQualifier().toByteArray(); 2825 CompareOperator compareOp = 2826 CompareOperator.valueOf(condition.getCompareType().name()); 2827 ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator()); 2828 TimeRange timeRange = condition.hasTimeRange() ? 2829 ProtobufUtil.toTimeRange(condition.getTimeRange()) : 2830 TimeRange.allTime(); 2831 if (region.getCoprocessorHost() != null) { 2832 processed = region.getCoprocessorHost().preCheckAndPut(row, family, qualifier, 2833 compareOp, comparator, put); 2834 } 2835 if (processed == null) { 2836 boolean result = region.checkAndMutate(row, family, 2837 qualifier, compareOp, comparator, timeRange, put); 2838 if (region.getCoprocessorHost() != null) { 2839 result = region.getCoprocessorHost().postCheckAndPut(row, family, 2840 qualifier, compareOp, comparator, put, result); 2841 } 2842 processed = result; 2843 } 2844 } else { 2845 region.put(put); 2846 processed = Boolean.TRUE; 2847 } 2848 break; 2849 case DELETE: 2850 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); 2851 checkCellSizeLimit(region, delete); 2852 spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete); 2853 quota.addMutation(delete); 2854 if (request.hasCondition()) { 2855 Condition condition = request.getCondition(); 2856 byte[] row = condition.getRow().toByteArray(); 2857 byte[] family = condition.getFamily().toByteArray(); 2858 byte[] qualifier = condition.getQualifier().toByteArray(); 2859 CompareOperator op = CompareOperator.valueOf(condition.getCompareType().name()); 2860 ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator()); 2861 TimeRange timeRange = condition.hasTimeRange() ? 2862 ProtobufUtil.toTimeRange(condition.getTimeRange()) : 2863 TimeRange.allTime(); 2864 if (region.getCoprocessorHost() != null) { 2865 processed = region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier, op, 2866 comparator, delete); 2867 } 2868 if (processed == null) { 2869 boolean result = region.checkAndMutate(row, family, 2870 qualifier, op, comparator, timeRange, delete); 2871 if (region.getCoprocessorHost() != null) { 2872 result = region.getCoprocessorHost().postCheckAndDelete(row, family, 2873 qualifier, op, comparator, delete, result); 2874 } 2875 processed = result; 2876 } 2877 } else { 2878 region.delete(delete); 2879 processed = Boolean.TRUE; 2880 } 2881 break; 2882 default: 2883 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 2884 } 2885 if (processed != null) { 2886 builder.setProcessed(processed.booleanValue()); 2887 } 2888 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2889 addResult(builder, r, controller, clientCellBlockSupported); 2890 if (clientCellBlockSupported) { 2891 addSize(context, r, null); 2892 } 2893 return builder.build(); 2894 } catch (IOException ie) { 2895 regionServer.checkFileSystem(); 2896 throw new ServiceException(ie); 2897 } finally { 2898 if (quota != null) { 2899 quota.close(); 2900 } 2901 // Update metrics 2902 if (regionServer.metricsRegionServer != null && type != null) { 2903 long after = EnvironmentEdgeManager.currentTime(); 2904 switch (type) { 2905 case DELETE: 2906 if (request.hasCondition()) { 2907 regionServer.metricsRegionServer.updateCheckAndDelete(after - before); 2908 } else { 2909 regionServer.metricsRegionServer.updateDelete( 2910 region == null ? null : region.getRegionInfo().getTable(), after - before); 2911 } 2912 break; 2913 case PUT: 2914 if (request.hasCondition()) { 2915 regionServer.metricsRegionServer.updateCheckAndPut(after - before); 2916 } else { 2917 regionServer.metricsRegionServer.updatePut( 2918 region == null ? null : region.getRegionInfo().getTable(),after - before); 2919 } 2920 break; 2921 default: 2922 break; 2923 2924 } 2925 } 2926 } 2927 } 2928 2929 // This is used to keep compatible with the old client implementation. Consider remove it if we 2930 // decide to drop the support of the client that still sends close request to a region scanner 2931 // which has already been exhausted. 2932 @Deprecated 2933 private static final IOException SCANNER_ALREADY_CLOSED = new IOException() { 2934 2935 private static final long serialVersionUID = -4305297078988180130L; 2936 2937 @Override 2938 public synchronized Throwable fillInStackTrace() { 2939 return this; 2940 } 2941 }; 2942 2943 private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException { 2944 String scannerName = Long.toString(request.getScannerId()); 2945 RegionScannerHolder rsh = scanners.get(scannerName); 2946 if (rsh == null) { 2947 // just ignore the next or close request if scanner does not exists. 2948 if (closedScanners.getIfPresent(scannerName) != null) { 2949 throw SCANNER_ALREADY_CLOSED; 2950 } else { 2951 LOG.warn("Client tried to access missing scanner " + scannerName); 2952 throw new UnknownScannerException( 2953 "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " + 2954 "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " + 2955 "long wait between consecutive client checkins, c) Server may be closing down, " + 2956 "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " + 2957 "possible fix would be increasing the value of" + 2958 "'hbase.client.scanner.timeout.period' configuration."); 2959 } 2960 } 2961 RegionInfo hri = rsh.s.getRegionInfo(); 2962 // Yes, should be the same instance 2963 if (regionServer.getOnlineRegion(hri.getRegionName()) != rsh.r) { 2964 String msg = "Region has changed on the scanner " + scannerName + ": regionName=" 2965 + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r; 2966 LOG.warn(msg + ", closing..."); 2967 scanners.remove(scannerName); 2968 try { 2969 rsh.s.close(); 2970 } catch (IOException e) { 2971 LOG.warn("Getting exception closing " + scannerName, e); 2972 } finally { 2973 try { 2974 regionServer.leases.cancelLease(scannerName); 2975 } catch (LeaseException e) { 2976 LOG.warn("Getting exception closing " + scannerName, e); 2977 } 2978 } 2979 throw new NotServingRegionException(msg); 2980 } 2981 return rsh; 2982 } 2983 2984 private RegionScannerHolder newRegionScanner(ScanRequest request, ScanResponse.Builder builder) 2985 throws IOException { 2986 HRegion region = getRegion(request.getRegion()); 2987 ClientProtos.Scan protoScan = request.getScan(); 2988 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); 2989 Scan scan = ProtobufUtil.toScan(protoScan); 2990 // if the request doesn't set this, get the default region setting. 2991 if (!isLoadingCfsOnDemandSet) { 2992 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 2993 } 2994 2995 if (!scan.hasFamilies()) { 2996 // Adding all families to scanner 2997 for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) { 2998 scan.addFamily(family); 2999 } 3000 } 3001 if (region.getCoprocessorHost() != null) { 3002 // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a 3003 // wrapper for the core created RegionScanner 3004 region.getCoprocessorHost().preScannerOpen(scan); 3005 } 3006 RegionScannerImpl coreScanner = region.getScanner(scan); 3007 Shipper shipper = coreScanner; 3008 RegionScanner scanner = coreScanner; 3009 if (region.getCoprocessorHost() != null) { 3010 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); 3011 } 3012 long scannerId = scannerIdGenerator.generateNewScannerId(); 3013 builder.setScannerId(scannerId); 3014 builder.setMvccReadPoint(scanner.getMvccReadPoint()); 3015 builder.setTtl(scannerLeaseTimeoutPeriod); 3016 String scannerName = String.valueOf(scannerId); 3017 return addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult()); 3018 } 3019 3020 private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) 3021 throws OutOfOrderScannerNextException { 3022 // if nextCallSeq does not match throw Exception straight away. This needs to be 3023 // performed even before checking of Lease. 3024 // See HBASE-5974 3025 if (request.hasNextCallSeq()) { 3026 long callSeq = request.getNextCallSeq(); 3027 if (!rsh.incNextCallSeq(callSeq)) { 3028 throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.getNextCallSeq() 3029 + " But the nextCallSeq got from client: " + request.getNextCallSeq() + "; request=" 3030 + TextFormat.shortDebugString(request)); 3031 } 3032 } 3033 } 3034 3035 private void addScannerLeaseBack(Leases.Lease lease) { 3036 try { 3037 regionServer.leases.addLease(lease); 3038 } catch (LeaseStillHeldException e) { 3039 // should not happen as the scanner id is unique. 3040 throw new AssertionError(e); 3041 } 3042 } 3043 3044 private long getTimeLimit(HBaseRpcController controller, boolean allowHeartbeatMessages) { 3045 // Set the time limit to be half of the more restrictive timeout value (one of the 3046 // timeout values must be positive). In the event that both values are positive, the 3047 // more restrictive of the two is used to calculate the limit. 3048 if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) { 3049 long timeLimitDelta; 3050 if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) { 3051 timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout); 3052 } else { 3053 timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout; 3054 } 3055 if (controller != null && controller.getCallTimeout() > 0) { 3056 timeLimitDelta = Math.min(timeLimitDelta, controller.getCallTimeout()); 3057 } 3058 // Use half of whichever timeout value was more restrictive... But don't allow 3059 // the time limit to be less than the allowable minimum (could cause an 3060 // immediatate timeout before scanning any data). 3061 timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); 3062 // XXX: Can not use EnvironmentEdge here because TestIncrementTimeRange use a 3063 // ManualEnvironmentEdge. Consider using System.nanoTime instead. 3064 return System.currentTimeMillis() + timeLimitDelta; 3065 } 3066 // Default value of timeLimit is negative to indicate no timeLimit should be 3067 // enforced. 3068 return -1L; 3069 } 3070 3071 private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows, 3072 ScannerContext scannerContext, ScanResponse.Builder builder) { 3073 if (numOfCompleteRows >= limitOfRows) { 3074 if (LOG.isTraceEnabled()) { 3075 LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows + 3076 " scannerContext: " + scannerContext); 3077 } 3078 builder.setMoreResults(false); 3079 } 3080 } 3081 3082 // return whether we have more results in region. 3083 private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, 3084 long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results, 3085 ScanResponse.Builder builder, MutableObject lastBlock, RpcCallContext context) 3086 throws IOException { 3087 HRegion region = rsh.r; 3088 RegionScanner scanner = rsh.s; 3089 long maxResultSize; 3090 if (scanner.getMaxResultSize() > 0) { 3091 maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize); 3092 } else { 3093 maxResultSize = maxQuotaResultSize; 3094 } 3095 // This is cells inside a row. Default size is 10 so if many versions or many cfs, 3096 // then we'll resize. Resizings show in profiler. Set it higher than 10. For now 3097 // arbitrary 32. TODO: keep record of general size of results being returned. 3098 List<Cell> values = new ArrayList<>(32); 3099 region.startRegionOperation(Operation.SCAN); 3100 try { 3101 int numOfResults = 0; 3102 int numOfCompleteRows = 0; 3103 long before = EnvironmentEdgeManager.currentTime(); 3104 synchronized (scanner) { 3105 boolean stale = (region.getRegionInfo().getReplicaId() != 0); 3106 boolean clientHandlesPartials = 3107 request.hasClientHandlesPartials() && request.getClientHandlesPartials(); 3108 boolean clientHandlesHeartbeats = 3109 request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); 3110 3111 // On the server side we must ensure that the correct ordering of partial results is 3112 // returned to the client to allow them to properly reconstruct the partial results. 3113 // If the coprocessor host is adding to the result list, we cannot guarantee the 3114 // correct ordering of partial results and so we prevent partial results from being 3115 // formed. 3116 boolean serverGuaranteesOrderOfPartials = results.isEmpty(); 3117 boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials; 3118 boolean moreRows = false; 3119 3120 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a 3121 // certain time threshold on the server. When the time threshold is exceeded, the 3122 // server stops the scan and sends back whatever Results it has accumulated within 3123 // that time period (may be empty). Since heartbeat messages have the potential to 3124 // create partial Results (in the event that the timeout occurs in the middle of a 3125 // row), we must only generate heartbeat messages when the client can handle both 3126 // heartbeats AND partials 3127 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; 3128 3129 long timeLimit = getTimeLimit(controller, allowHeartbeatMessages); 3130 3131 final LimitScope sizeScope = 3132 allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3133 final LimitScope timeScope = 3134 allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3135 3136 boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics(); 3137 3138 // Configure with limits for this RPC. Set keep progress true since size progress 3139 // towards size limit should be kept between calls to nextRaw 3140 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); 3141 // maxResultSize - either we can reach this much size for all cells(being read) data or sum 3142 // of heap size occupied by cells(being read). Cell data means its key and value parts. 3143 contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize); 3144 contextBuilder.setBatchLimit(scanner.getBatch()); 3145 contextBuilder.setTimeLimit(timeScope, timeLimit); 3146 contextBuilder.setTrackMetrics(trackMetrics); 3147 ScannerContext scannerContext = contextBuilder.build(); 3148 boolean limitReached = false; 3149 while (numOfResults < maxResults) { 3150 // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The 3151 // batch limit is a limit on the number of cells per Result. Thus, if progress is 3152 // being tracked (i.e. scannerContext.keepProgress() is true) then we need to 3153 // reset the batch progress between nextRaw invocations since we don't want the 3154 // batch progress from previous calls to affect future calls 3155 scannerContext.setBatchProgress(0); 3156 3157 // Collect values to be returned here 3158 moreRows = scanner.nextRaw(values, scannerContext); 3159 3160 if (!values.isEmpty()) { 3161 if (limitOfRows > 0) { 3162 // First we need to check if the last result is partial and we have a row change. If 3163 // so then we need to increase the numOfCompleteRows. 3164 if (results.isEmpty()) { 3165 if (rsh.rowOfLastPartialResult != null && 3166 !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult)) { 3167 numOfCompleteRows++; 3168 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3169 builder); 3170 } 3171 } else { 3172 Result lastResult = results.get(results.size() - 1); 3173 if (lastResult.mayHaveMoreCellsInRow() && 3174 !CellUtil.matchingRows(values.get(0), lastResult.getRow())) { 3175 numOfCompleteRows++; 3176 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3177 builder); 3178 } 3179 } 3180 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3181 break; 3182 } 3183 } 3184 boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow(); 3185 Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow); 3186 lastBlock.setValue(addSize(context, r, lastBlock.getValue())); 3187 results.add(r); 3188 numOfResults++; 3189 if (!mayHaveMoreCellsInRow && limitOfRows > 0) { 3190 numOfCompleteRows++; 3191 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder); 3192 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3193 break; 3194 } 3195 } 3196 } else if (!moreRows && !results.isEmpty()) { 3197 // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of 3198 // last result is false. Otherwise it's possible that: the first nextRaw returned 3199 // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the 3200 // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will 3201 // return with moreRows=false, which means moreResultsInRegion would be false, it will 3202 // be a contradictory state (HBASE-21206). 3203 int lastIdx = results.size() - 1; 3204 Result r = results.get(lastIdx); 3205 if (r.mayHaveMoreCellsInRow()) { 3206 results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false)); 3207 } 3208 } 3209 boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); 3210 boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); 3211 boolean resultsLimitReached = numOfResults >= maxResults; 3212 limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached; 3213 3214 if (limitReached || !moreRows) { 3215 if (LOG.isTraceEnabled()) { 3216 LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: " + moreRows 3217 + " scannerContext: " + scannerContext); 3218 } 3219 // We only want to mark a ScanResponse as a heartbeat message in the event that 3220 // there are more values to be read server side. If there aren't more values, 3221 // marking it as a heartbeat is wasteful because the client will need to issue 3222 // another ScanRequest only to realize that they already have all the values 3223 if (moreRows && timeLimitReached) { 3224 // Heartbeat messages occur when the time limit has been reached. 3225 builder.setHeartbeatMessage(true); 3226 if (rsh.needCursor) { 3227 Cell cursorCell = scannerContext.getLastPeekedCell(); 3228 if (cursorCell != null) { 3229 builder.setCursor(ProtobufUtil.toCursor(cursorCell)); 3230 } 3231 } 3232 } 3233 break; 3234 } 3235 values.clear(); 3236 } 3237 builder.setMoreResultsInRegion(moreRows); 3238 // Check to see if the client requested that we track metrics server side. If the 3239 // client requested metrics, retrieve the metrics from the scanner context. 3240 if (trackMetrics) { 3241 Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap(); 3242 ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); 3243 NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); 3244 3245 for (Entry<String, Long> entry : metrics.entrySet()) { 3246 pairBuilder.setName(entry.getKey()); 3247 pairBuilder.setValue(entry.getValue()); 3248 metricBuilder.addMetrics(pairBuilder.build()); 3249 } 3250 3251 builder.setScanMetrics(metricBuilder.build()); 3252 } 3253 } 3254 long end = EnvironmentEdgeManager.currentTime(); 3255 long responseCellSize = context != null ? context.getResponseCellSize() : 0; 3256 region.getMetrics().updateScanTime(end - before); 3257 if (regionServer.metricsRegionServer != null) { 3258 regionServer.metricsRegionServer.updateScanSize( 3259 region.getTableDescriptor().getTableName(), responseCellSize); 3260 regionServer.metricsRegionServer.updateScanTime( 3261 region.getTableDescriptor().getTableName(), end - before); 3262 } 3263 } finally { 3264 region.closeRegionOperation(); 3265 } 3266 // coprocessor postNext hook 3267 if (region.getCoprocessorHost() != null) { 3268 region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true); 3269 } 3270 } 3271 3272 /** 3273 * Scan data in a table. 3274 * 3275 * @param controller the RPC controller 3276 * @param request the scan request 3277 * @throws ServiceException 3278 */ 3279 @Override 3280 public ScanResponse scan(final RpcController controller, final ScanRequest request) 3281 throws ServiceException { 3282 if (controller != null && !(controller instanceof HBaseRpcController)) { 3283 throw new UnsupportedOperationException( 3284 "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller); 3285 } 3286 if (!request.hasScannerId() && !request.hasScan()) { 3287 throw new ServiceException( 3288 new DoNotRetryIOException("Missing required input: scannerId or scan")); 3289 } 3290 try { 3291 checkOpen(); 3292 } catch (IOException e) { 3293 if (request.hasScannerId()) { 3294 String scannerName = Long.toString(request.getScannerId()); 3295 if (LOG.isDebugEnabled()) { 3296 LOG.debug( 3297 "Server shutting down and client tried to access missing scanner " + scannerName); 3298 } 3299 if (regionServer.leases != null) { 3300 try { 3301 regionServer.leases.cancelLease(scannerName); 3302 } catch (LeaseException le) { 3303 // No problem, ignore 3304 if (LOG.isTraceEnabled()) { 3305 LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); 3306 } 3307 } 3308 } 3309 } 3310 throw new ServiceException(e); 3311 } 3312 requestCount.increment(); 3313 rpcScanRequestCount.increment(); 3314 RegionScannerHolder rsh; 3315 ScanResponse.Builder builder = ScanResponse.newBuilder(); 3316 try { 3317 if (request.hasScannerId()) { 3318 // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000 3319 // for more details. 3320 builder.setScannerId(request.getScannerId()); 3321 rsh = getRegionScanner(request); 3322 } else { 3323 rsh = newRegionScanner(request, builder); 3324 } 3325 } catch (IOException e) { 3326 if (e == SCANNER_ALREADY_CLOSED) { 3327 // Now we will close scanner automatically if there are no more results for this region but 3328 // the old client will still send a close request to us. Just ignore it and return. 3329 return builder.build(); 3330 } 3331 throw new ServiceException(e); 3332 } 3333 HRegion region = rsh.r; 3334 String scannerName = rsh.scannerName; 3335 Leases.Lease lease; 3336 try { 3337 // Remove lease while its being processed in server; protects against case 3338 // where processing of request takes > lease expiration time. 3339 lease = regionServer.leases.removeLease(scannerName); 3340 } catch (LeaseException e) { 3341 throw new ServiceException(e); 3342 } 3343 if (request.hasRenew() && request.getRenew()) { 3344 // add back and return 3345 addScannerLeaseBack(lease); 3346 try { 3347 checkScanNextCallSeq(request, rsh); 3348 } catch (OutOfOrderScannerNextException e) { 3349 throw new ServiceException(e); 3350 } 3351 return builder.build(); 3352 } 3353 OperationQuota quota; 3354 try { 3355 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN); 3356 } catch (IOException e) { 3357 addScannerLeaseBack(lease); 3358 throw new ServiceException(e); 3359 } 3360 try { 3361 checkScanNextCallSeq(request, rsh); 3362 } catch (OutOfOrderScannerNextException e) { 3363 addScannerLeaseBack(lease); 3364 throw new ServiceException(e); 3365 } 3366 // Now we have increased the next call sequence. If we give client an error, the retry will 3367 // never success. So we'd better close the scanner and return a DoNotRetryIOException to client 3368 // and then client will try to open a new scanner. 3369 boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false; 3370 int rows; // this is scan.getCaching 3371 if (request.hasNumberOfRows()) { 3372 rows = request.getNumberOfRows(); 3373 } else { 3374 rows = closeScanner ? 0 : 1; 3375 } 3376 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 3377 // now let's do the real scan. 3378 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); 3379 RegionScanner scanner = rsh.s; 3380 // this is the limit of rows for this scan, if we the number of rows reach this value, we will 3381 // close the scanner. 3382 int limitOfRows; 3383 if (request.hasLimitOfRows()) { 3384 limitOfRows = request.getLimitOfRows(); 3385 } else { 3386 limitOfRows = -1; 3387 } 3388 MutableObject<Object> lastBlock = new MutableObject<>(); 3389 boolean scannerClosed = false; 3390 try { 3391 List<Result> results = new ArrayList<>(); 3392 if (rows > 0) { 3393 boolean done = false; 3394 // Call coprocessor. Get region info from scanner. 3395 if (region.getCoprocessorHost() != null) { 3396 Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); 3397 if (!results.isEmpty()) { 3398 for (Result r : results) { 3399 lastBlock.setValue(addSize(context, r, lastBlock.getValue())); 3400 } 3401 } 3402 if (bypass != null && bypass.booleanValue()) { 3403 done = true; 3404 } 3405 } 3406 if (!done) { 3407 scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows, 3408 results, builder, lastBlock, context); 3409 } else { 3410 builder.setMoreResultsInRegion(!results.isEmpty()); 3411 } 3412 } else { 3413 // This is a open scanner call with numberOfRow = 0, so set more results in region to true. 3414 builder.setMoreResultsInRegion(true); 3415 } 3416 3417 quota.addScanResult(results); 3418 addResults(builder, results, (HBaseRpcController) controller, 3419 RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), 3420 isClientCellBlockSupport(context)); 3421 if (scanner.isFilterDone() && results.isEmpty()) { 3422 // If the scanner's filter - if any - is done with the scan 3423 // only set moreResults to false if the results is empty. This is used to keep compatible 3424 // with the old scan implementation where we just ignore the returned results if moreResults 3425 // is false. Can remove the isEmpty check after we get rid of the old implementation. 3426 builder.setMoreResults(false); 3427 } 3428 // Later we may close the scanner depending on this flag so here we need to make sure that we 3429 // have already set this flag. 3430 assert builder.hasMoreResultsInRegion(); 3431 // we only set moreResults to false in the above code, so set it to true if we haven't set it 3432 // yet. 3433 if (!builder.hasMoreResults()) { 3434 builder.setMoreResults(true); 3435 } 3436 if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) { 3437 // Record the last cell of the last result if it is a partial result 3438 // We need this to calculate the complete rows we have returned to client as the 3439 // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the 3440 // current row. We may filter out all the remaining cells for the current row and just 3441 // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to 3442 // check for row change. 3443 Result lastResult = results.get(results.size() - 1); 3444 if (lastResult.mayHaveMoreCellsInRow()) { 3445 rsh.rowOfLastPartialResult = lastResult.getRow(); 3446 } else { 3447 rsh.rowOfLastPartialResult = null; 3448 } 3449 } 3450 if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) { 3451 scannerClosed = true; 3452 closeScanner(region, scanner, scannerName, context); 3453 } 3454 return builder.build(); 3455 } catch (IOException e) { 3456 try { 3457 // scanner is closed here 3458 scannerClosed = true; 3459 // The scanner state might be left in a dirty state, so we will tell the Client to 3460 // fail this RPC and close the scanner while opening up another one from the start of 3461 // row that the client has last seen. 3462 closeScanner(region, scanner, scannerName, context); 3463 3464 // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is 3465 // used in two different semantics. 3466 // (1) The first is to close the client scanner and bubble up the exception all the way 3467 // to the application. This is preferred when the exception is really un-recoverable 3468 // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this 3469 // bucket usually. 3470 // (2) Second semantics is to close the current region scanner only, but continue the 3471 // client scanner by overriding the exception. This is usually UnknownScannerException, 3472 // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the 3473 // application-level ClientScanner has to continue without bubbling up the exception to 3474 // the client. See ClientScanner code to see how it deals with these special exceptions. 3475 if (e instanceof DoNotRetryIOException) { 3476 throw e; 3477 } 3478 3479 // If it is a FileNotFoundException, wrap as a 3480 // DoNotRetryIOException. This can avoid the retry in ClientScanner. 3481 if (e instanceof FileNotFoundException) { 3482 throw new DoNotRetryIOException(e); 3483 } 3484 3485 // We closed the scanner already. Instead of throwing the IOException, and client 3486 // retrying with the same scannerId only to get USE on the next RPC, we directly throw 3487 // a special exception to save an RPC. 3488 if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) { 3489 // 1.4.0+ clients know how to handle 3490 throw new ScannerResetException("Scanner is closed on the server-side", e); 3491 } else { 3492 // older clients do not know about SRE. Just throw USE, which they will handle 3493 throw new UnknownScannerException("Throwing UnknownScannerException to reset the client" 3494 + " scanner state for clients older than 1.3.", e); 3495 } 3496 } catch (IOException ioe) { 3497 throw new ServiceException(ioe); 3498 } 3499 } finally { 3500 if (!scannerClosed) { 3501 // Adding resets expiration time on lease. 3502 // the closeCallBack will be set in closeScanner so here we only care about shippedCallback 3503 if (context != null) { 3504 context.setCallBack(rsh.shippedCallback); 3505 } else { 3506 // When context != null, adding back the lease will be done in callback set above. 3507 addScannerLeaseBack(lease); 3508 } 3509 } 3510 quota.close(); 3511 } 3512 } 3513 3514 private void closeScanner(HRegion region, RegionScanner scanner, String scannerName, 3515 RpcCallContext context) throws IOException { 3516 if (region.getCoprocessorHost() != null) { 3517 if (region.getCoprocessorHost().preScannerClose(scanner)) { 3518 // bypass the actual close. 3519 return; 3520 } 3521 } 3522 RegionScannerHolder rsh = scanners.remove(scannerName); 3523 if (rsh != null) { 3524 if (context != null) { 3525 context.setCallBack(rsh.closeCallBack); 3526 } else { 3527 rsh.s.close(); 3528 } 3529 if (region.getCoprocessorHost() != null) { 3530 region.getCoprocessorHost().postScannerClose(scanner); 3531 } 3532 closedScanners.put(scannerName, scannerName); 3533 } 3534 } 3535 3536 @Override 3537 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 3538 CoprocessorServiceRequest request) throws ServiceException { 3539 rpcPreCheck("execRegionServerService"); 3540 return regionServer.execRegionServerService(controller, request); 3541 } 3542 3543 @Override 3544 public UpdateConfigurationResponse updateConfiguration( 3545 RpcController controller, UpdateConfigurationRequest request) 3546 throws ServiceException { 3547 try { 3548 this.regionServer.updateConfiguration(); 3549 } catch (Exception e) { 3550 throw new ServiceException(e); 3551 } 3552 return UpdateConfigurationResponse.getDefaultInstance(); 3553 } 3554 3555 @Override 3556 public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots( 3557 RpcController controller, GetSpaceQuotaSnapshotsRequest request) throws ServiceException { 3558 try { 3559 final RegionServerSpaceQuotaManager manager = 3560 regionServer.getRegionServerSpaceQuotaManager(); 3561 final GetSpaceQuotaSnapshotsResponse.Builder builder = 3562 GetSpaceQuotaSnapshotsResponse.newBuilder(); 3563 if (manager != null) { 3564 final Map<TableName,SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots(); 3565 for (Entry<TableName,SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) { 3566 builder.addSnapshots(TableQuotaSnapshot.newBuilder() 3567 .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey())) 3568 .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())) 3569 .build()); 3570 } 3571 } 3572 return builder.build(); 3573 } catch (Exception e) { 3574 throw new ServiceException(e); 3575 } 3576 } 3577 3578 @Override 3579 public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, 3580 ClearRegionBlockCacheRequest request) { 3581 ClearRegionBlockCacheResponse.Builder builder = 3582 ClearRegionBlockCacheResponse.newBuilder(); 3583 CacheEvictionStatsBuilder stats = CacheEvictionStats.builder(); 3584 List<HRegion> regions = getRegions(request.getRegionList(), stats); 3585 for (HRegion region : regions) { 3586 try { 3587 stats = stats.append(this.regionServer.clearRegionBlockCache(region)); 3588 } catch (Exception e) { 3589 stats.addException(region.getRegionInfo().getRegionName(), e); 3590 } 3591 } 3592 stats.withMaxCacheSize(regionServer.getCacheConfig().getBlockCache().getMaxSize()); 3593 return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build(); 3594 } 3595 3596 @Override 3597 @QosPriority(priority = HConstants.ADMIN_QOS) 3598 public ExecuteProceduresResponse executeProcedures(RpcController controller, 3599 ExecuteProceduresRequest request) throws ServiceException { 3600 try { 3601 checkOpen(); 3602 regionServer.getRegionServerCoprocessorHost().preExecuteProcedures(); 3603 if (request.getOpenRegionCount() > 0) { 3604 for (OpenRegionRequest req : request.getOpenRegionList()) { 3605 openRegion(controller, req); 3606 } 3607 } 3608 if (request.getCloseRegionCount() > 0) { 3609 for (CloseRegionRequest req : request.getCloseRegionList()) { 3610 closeRegion(controller, req); 3611 } 3612 } 3613 if (request.getProcCount() > 0) { 3614 for (RemoteProcedureRequest req : request.getProcList()) { 3615 RSProcedureCallable callable; 3616 try { 3617 callable = Class.forName(req.getProcClass()).asSubclass(RSProcedureCallable.class) 3618 .getDeclaredConstructor().newInstance(); 3619 } catch (Exception e) { 3620 regionServer.remoteProcedureComplete(req.getProcId(), e); 3621 continue; 3622 } 3623 callable.init(req.getProcData().toByteArray(), regionServer); 3624 regionServer.executeProcedure(req.getProcId(), callable); 3625 } 3626 } 3627 regionServer.getRegionServerCoprocessorHost().postExecuteProcedures(); 3628 return ExecuteProceduresResponse.getDefaultInstance(); 3629 } catch (IOException e) { 3630 throw new ServiceException(e); 3631 } 3632 } 3633 3634 @VisibleForTesting 3635 public RpcScheduler getRpcScheduler() { 3636 return rpcServer.getScheduler(); 3637 } 3638}