001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.lang.reflect.InvocationTargetException; 025import java.net.BindException; 026import java.net.InetSocketAddress; 027import java.net.UnknownHostException; 028import java.nio.ByteBuffer; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.Iterator; 034import java.util.List; 035import java.util.Map; 036import java.util.Map.Entry; 037import java.util.NavigableMap; 038import java.util.Set; 039import java.util.TreeSet; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ConcurrentMap; 042import java.util.concurrent.TimeUnit; 043import java.util.concurrent.atomic.AtomicBoolean; 044import java.util.concurrent.atomic.AtomicLong; 045import java.util.concurrent.atomic.LongAdder; 046 047import org.apache.commons.lang3.mutable.MutableObject; 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.hbase.ByteBufferExtendedCell; 051import org.apache.hadoop.hbase.CacheEvictionStats; 052import org.apache.hadoop.hbase.CacheEvictionStatsBuilder; 053import org.apache.hadoop.hbase.Cell; 054import org.apache.hadoop.hbase.CellScannable; 055import org.apache.hadoop.hbase.CellScanner; 056import org.apache.hadoop.hbase.CellUtil; 057import org.apache.hadoop.hbase.CompareOperator; 058import org.apache.hadoop.hbase.DoNotRetryIOException; 059import org.apache.hadoop.hbase.DroppedSnapshotException; 060import org.apache.hadoop.hbase.HBaseIOException; 061import org.apache.hadoop.hbase.HConstants; 062import org.apache.hadoop.hbase.MultiActionResultTooLarge; 063import org.apache.hadoop.hbase.NotServingRegionException; 064import org.apache.hadoop.hbase.PrivateCellUtil; 065import org.apache.hadoop.hbase.Server; 066import org.apache.hadoop.hbase.ServerName; 067import org.apache.hadoop.hbase.TableName; 068import org.apache.hadoop.hbase.UnknownScannerException; 069import org.apache.hadoop.hbase.client.Append; 070import org.apache.hadoop.hbase.client.ConnectionUtils; 071import org.apache.hadoop.hbase.client.Delete; 072import org.apache.hadoop.hbase.client.Durability; 073import org.apache.hadoop.hbase.client.Get; 074import org.apache.hadoop.hbase.client.Increment; 075import org.apache.hadoop.hbase.client.Mutation; 076import org.apache.hadoop.hbase.client.Put; 077import org.apache.hadoop.hbase.client.RegionInfo; 078import org.apache.hadoop.hbase.client.RegionReplicaUtil; 079import org.apache.hadoop.hbase.client.Result; 080import org.apache.hadoop.hbase.client.Row; 081import org.apache.hadoop.hbase.client.RowMutations; 082import org.apache.hadoop.hbase.client.Scan; 083import org.apache.hadoop.hbase.client.TableDescriptor; 084import org.apache.hadoop.hbase.client.VersionInfoUtil; 085import org.apache.hadoop.hbase.conf.ConfigurationObserver; 086import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; 087import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 088import org.apache.hadoop.hbase.exceptions.ScannerResetException; 089import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 090import org.apache.hadoop.hbase.filter.ByteArrayComparable; 091import org.apache.hadoop.hbase.io.TimeRange; 092import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; 093import org.apache.hadoop.hbase.ipc.HBaseRpcController; 094import org.apache.hadoop.hbase.ipc.PriorityFunction; 095import org.apache.hadoop.hbase.ipc.QosPriority; 096import org.apache.hadoop.hbase.ipc.RpcCallContext; 097import org.apache.hadoop.hbase.ipc.RpcCallback; 098import org.apache.hadoop.hbase.ipc.RpcServer; 099import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 100import org.apache.hadoop.hbase.ipc.RpcServerFactory; 101import org.apache.hadoop.hbase.ipc.RpcServerInterface; 102import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 103import org.apache.hadoop.hbase.ipc.ServerRpcController; 104import org.apache.hadoop.hbase.log.HBaseMarkers; 105import org.apache.hadoop.hbase.master.MasterRpcServices; 106import org.apache.hadoop.hbase.net.Address; 107import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; 108import org.apache.hadoop.hbase.quotas.OperationQuota; 109import org.apache.hadoop.hbase.quotas.QuotaUtil; 110import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 111import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 112import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; 113import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; 114import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; 115import org.apache.hadoop.hbase.regionserver.Leases.Lease; 116import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; 117import org.apache.hadoop.hbase.regionserver.Region.Operation; 118import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 119import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 120import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; 121import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; 122import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; 123import org.apache.hadoop.hbase.security.Superusers; 124import org.apache.hadoop.hbase.security.User; 125import org.apache.hadoop.hbase.security.access.AccessChecker; 126import org.apache.hadoop.hbase.security.access.Permission; 127import org.apache.hadoop.hbase.util.Bytes; 128import org.apache.hadoop.hbase.util.CollectionUtils; 129import org.apache.hadoop.hbase.util.DNS; 130import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 131import org.apache.hadoop.hbase.util.Pair; 132import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 133import org.apache.hadoop.hbase.util.Strings; 134import org.apache.hadoop.hbase.wal.WAL; 135import org.apache.hadoop.hbase.wal.WALEdit; 136import org.apache.hadoop.hbase.wal.WALKey; 137import org.apache.hadoop.hbase.wal.WALSplitter; 138import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 139import org.apache.yetus.audience.InterfaceAudience; 140import org.slf4j.Logger; 141import org.slf4j.LoggerFactory; 142 143import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 144import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 145import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 146import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 147import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 148import org.apache.hbase.thirdparty.com.google.protobuf.Message; 149import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 150import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 151import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 152import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 153import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 154import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 155import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 156import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 157import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 158import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; 179import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 180import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; 181import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 182import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; 183import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 184import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 185import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 186import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 187import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 188import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 189import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 190import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 191import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; 192import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; 193import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 194import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; 195import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; 196import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 197import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action; 198import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 199import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; 200import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 201import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 202import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 203import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 204import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition; 205import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 206import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 207import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 208import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 209import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRegionLoadStats; 210import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 211import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 212import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 213import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 214import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 215import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 216import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 217import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 218import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; 219import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 220import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 221import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 222import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 223import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 224import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 225import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair; 226import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair; 227import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 228import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 229import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMetrics; 230import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 231import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 232import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; 233import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 234import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 235import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 236import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 237import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 238 239/** 240 * Implements the regionserver RPC services. 241 */ 242@InterfaceAudience.Private 243@SuppressWarnings("deprecation") 244public class RSRpcServices implements HBaseRPCErrorHandler, 245 AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction, 246 ConfigurationObserver { 247 protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class); 248 249 /** RPC scheduler to use for the region server. */ 250 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS = 251 "hbase.region.server.rpc.scheduler.factory.class"; 252 253 /** 254 * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This 255 * configuration exists to prevent the scenario where a time limit is specified to be so 256 * restrictive that the time limit is reached immediately (before any cells are scanned). 257 */ 258 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 259 "hbase.region.server.rpc.minimum.scan.time.limit.delta"; 260 /** 261 * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA} 262 */ 263 private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; 264 265 /** 266 * Number of rows in a batch operation above which a warning will be logged. 267 */ 268 static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold"; 269 /** 270 * Default value of {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME} 271 */ 272 static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000; 273 274 protected static final String RESERVOIR_ENABLED_KEY = "hbase.ipc.server.reservoir.enabled"; 275 276 // Request counter. (Includes requests that are not serviced by regions.) 277 // Count only once for requests with multiple actions like multi/caching-scan/replayBatch 278 final LongAdder requestCount = new LongAdder(); 279 280 // Request counter for rpc get 281 final LongAdder rpcGetRequestCount = new LongAdder(); 282 283 // Request counter for rpc scan 284 final LongAdder rpcScanRequestCount = new LongAdder(); 285 286 // Request counter for rpc multi 287 final LongAdder rpcMultiRequestCount = new LongAdder(); 288 289 // Request counter for rpc mutate 290 final LongAdder rpcMutateRequestCount = new LongAdder(); 291 292 // Server to handle client requests. 293 final RpcServerInterface rpcServer; 294 final InetSocketAddress isa; 295 296 private final HRegionServer regionServer; 297 private final long maxScannerResultSize; 298 299 // The reference to the priority extraction function 300 private final PriorityFunction priority; 301 302 private ScannerIdGenerator scannerIdGenerator; 303 private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>(); 304 // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients 305 // which may send next or close request to a region scanner which has already been exhausted. The 306 // entries will be removed automatically after scannerLeaseTimeoutPeriod. 307 private final Cache<String, String> closedScanners; 308 /** 309 * The lease timeout period for client scanners (milliseconds). 310 */ 311 private final int scannerLeaseTimeoutPeriod; 312 313 /** 314 * The RPC timeout period (milliseconds) 315 */ 316 private final int rpcTimeout; 317 318 /** 319 * The minimum allowable delta to use for the scan limit 320 */ 321 private final long minimumScanTimeLimitDelta; 322 323 /** 324 * Row size threshold for multi requests above which a warning is logged 325 */ 326 private final int rowSizeWarnThreshold; 327 328 final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); 329 330 // We want to vet all accesses at the point of entry itself; limiting scope of access checker 331 // instance to only this class to prevent its use from spreading deeper into implementation. 332 // Initialized in start() since AccessChecker needs ZKWatcher which is created by HRegionServer 333 // after RSRpcServices constructor and before start() is called. 334 // Initialized only if authorization is enabled, else remains null. 335 protected AccessChecker accessChecker; 336 337 /** 338 * Services launched in RSRpcServices. By default they are on but you can use the below 339 * booleans to selectively enable/disable either Admin or Client Service (Rare is the case 340 * where you would ever turn off one or the other). 341 */ 342 public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG = 343 "hbase.regionserver.admin.executorService"; 344 public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG = 345 "hbase.regionserver.client.executorService"; 346 347 /** 348 * An Rpc callback for closing a RegionScanner. 349 */ 350 private static final class RegionScannerCloseCallBack implements RpcCallback { 351 352 private final RegionScanner scanner; 353 354 public RegionScannerCloseCallBack(RegionScanner scanner) { 355 this.scanner = scanner; 356 } 357 358 @Override 359 public void run() throws IOException { 360 this.scanner.close(); 361 } 362 } 363 364 /** 365 * An Rpc callback for doing shipped() call on a RegionScanner. 366 */ 367 private class RegionScannerShippedCallBack implements RpcCallback { 368 369 private final String scannerName; 370 private final Shipper shipper; 371 private final Lease lease; 372 373 public RegionScannerShippedCallBack(String scannerName, Shipper shipper, Lease lease) { 374 this.scannerName = scannerName; 375 this.shipper = shipper; 376 this.lease = lease; 377 } 378 379 @Override 380 public void run() throws IOException { 381 this.shipper.shipped(); 382 // We're done. On way out re-add the above removed lease. The lease was temp removed for this 383 // Rpc call and we are at end of the call now. Time to add it back. 384 if (scanners.containsKey(scannerName)) { 385 if (lease != null) regionServer.leases.addLease(lease); 386 } 387 } 388 } 389 390 /** 391 * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on 392 * completion of multiGets. 393 */ 394 static class RegionScannersCloseCallBack implements RpcCallback { 395 private final List<RegionScanner> scanners = new ArrayList<>(); 396 397 public void addScanner(RegionScanner scanner) { 398 this.scanners.add(scanner); 399 } 400 401 @Override 402 public void run() { 403 for (RegionScanner scanner : scanners) { 404 try { 405 scanner.close(); 406 } catch (IOException e) { 407 LOG.error("Exception while closing the scanner " + scanner, e); 408 } 409 } 410 } 411 } 412 413 /** 414 * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. 415 */ 416 private static final class RegionScannerHolder { 417 418 private final AtomicLong nextCallSeq = new AtomicLong(0); 419 private final String scannerName; 420 private final RegionScanner s; 421 private final HRegion r; 422 private final RpcCallback closeCallBack; 423 private final RpcCallback shippedCallback; 424 private byte[] rowOfLastPartialResult; 425 private boolean needCursor; 426 427 public RegionScannerHolder(String scannerName, RegionScanner s, HRegion r, 428 RpcCallback closeCallBack, RpcCallback shippedCallback, boolean needCursor) { 429 this.scannerName = scannerName; 430 this.s = s; 431 this.r = r; 432 this.closeCallBack = closeCallBack; 433 this.shippedCallback = shippedCallback; 434 this.needCursor = needCursor; 435 } 436 437 public long getNextCallSeq() { 438 return nextCallSeq.get(); 439 } 440 441 public boolean incNextCallSeq(long currentSeq) { 442 // Use CAS to prevent multiple scan request running on the same scanner. 443 return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); 444 } 445 } 446 447 /** 448 * Instantiated as a scanner lease. If the lease times out, the scanner is 449 * closed 450 */ 451 private class ScannerListener implements LeaseListener { 452 private final String scannerName; 453 454 ScannerListener(final String n) { 455 this.scannerName = n; 456 } 457 458 @Override 459 public void leaseExpired() { 460 RegionScannerHolder rsh = scanners.remove(this.scannerName); 461 if (rsh != null) { 462 RegionScanner s = rsh.s; 463 LOG.info("Scanner " + this.scannerName + " lease expired on region " 464 + s.getRegionInfo().getRegionNameAsString()); 465 HRegion region = null; 466 try { 467 region = regionServer.getRegion(s.getRegionInfo().getRegionName()); 468 if (region != null && region.getCoprocessorHost() != null) { 469 region.getCoprocessorHost().preScannerClose(s); 470 } 471 } catch (IOException e) { 472 LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e); 473 } finally { 474 try { 475 s.close(); 476 if (region != null && region.getCoprocessorHost() != null) { 477 region.getCoprocessorHost().postScannerClose(s); 478 } 479 } catch (IOException e) { 480 LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e); 481 } 482 } 483 } else { 484 LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" + 485 " scanner found, hence no chance to close that related scanner!"); 486 } 487 } 488 } 489 490 private static ResultOrException getResultOrException(final ClientProtos.Result r, 491 final int index){ 492 return getResultOrException(ResponseConverter.buildActionResult(r), index); 493 } 494 495 private static ResultOrException getResultOrException(final Exception e, final int index) { 496 return getResultOrException(ResponseConverter.buildActionResult(e), index); 497 } 498 499 private static ResultOrException getResultOrException( 500 final ResultOrException.Builder builder, final int index) { 501 return builder.setIndex(index).build(); 502 } 503 504 /** 505 * Checks for the following pre-checks in order: 506 * <ol> 507 * <li>RegionServer is running</li> 508 * <li>If authorization is enabled, then RPC caller has ADMIN permissions</li> 509 * </ol> 510 * @param requestName name of rpc request. Used in reporting failures to provide context. 511 * @throws ServiceException If any of the above listed pre-check fails. 512 */ 513 private void rpcPreCheck(String requestName) throws ServiceException { 514 try { 515 checkOpen(); 516 requirePermission(requestName, Permission.Action.ADMIN); 517 } catch (IOException ioe) { 518 throw new ServiceException(ioe); 519 } 520 } 521 522 /** 523 * Starts the nonce operation for a mutation, if needed. 524 * @param mutation Mutation. 525 * @param nonceGroup Nonce group from the request. 526 * @return whether to proceed this mutation. 527 */ 528 private boolean startNonceOperation(final MutationProto mutation, long nonceGroup) 529 throws IOException { 530 if (regionServer.nonceManager == null || !mutation.hasNonce()) return true; 531 boolean canProceed = false; 532 try { 533 canProceed = regionServer.nonceManager.startOperation( 534 nonceGroup, mutation.getNonce(), regionServer); 535 } catch (InterruptedException ex) { 536 throw new InterruptedIOException("Nonce start operation interrupted"); 537 } 538 return canProceed; 539 } 540 541 /** 542 * Ends nonce operation for a mutation, if needed. 543 * @param mutation Mutation. 544 * @param nonceGroup Nonce group from the request. Always 0 in initial implementation. 545 * @param success Whether the operation for this nonce has succeeded. 546 */ 547 private void endNonceOperation(final MutationProto mutation, 548 long nonceGroup, boolean success) { 549 if (regionServer.nonceManager != null && mutation.hasNonce()) { 550 regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success); 551 } 552 } 553 554 private boolean isClientCellBlockSupport(RpcCallContext context) { 555 return context != null && context.isClientCellBlockSupported(); 556 } 557 558 private void addResult(final MutateResponse.Builder builder, final Result result, 559 final HBaseRpcController rpcc, boolean clientCellBlockSupported) { 560 if (result == null) return; 561 if (clientCellBlockSupported) { 562 builder.setResult(ProtobufUtil.toResultNoData(result)); 563 rpcc.setCellScanner(result.cellScanner()); 564 } else { 565 ClientProtos.Result pbr = ProtobufUtil.toResult(result); 566 builder.setResult(pbr); 567 } 568 } 569 570 private void addResults(ScanResponse.Builder builder, List<Result> results, 571 HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { 572 builder.setStale(!isDefaultRegion); 573 if (results.isEmpty()) { 574 return; 575 } 576 if (clientCellBlockSupported) { 577 for (Result res : results) { 578 builder.addCellsPerResult(res.size()); 579 builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow()); 580 } 581 controller.setCellScanner(CellUtil.createCellScanner(results)); 582 } else { 583 for (Result res : results) { 584 ClientProtos.Result pbr = ProtobufUtil.toResult(res); 585 builder.addResults(pbr); 586 } 587 } 588 } 589 590 /** 591 * Mutate a list of rows atomically. 592 * @param cellScanner if non-null, the mutation data -- the Cell content. 593 */ 594 private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions, 595 final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, CompareOperator op, 596 ByteArrayComparable comparator, TimeRange timeRange, RegionActionResult.Builder builder, 597 ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 598 int countOfCompleteMutation = 0; 599 try { 600 if (!region.getRegionInfo().isMetaRegion()) { 601 regionServer.cacheFlusher.reclaimMemStoreMemory(); 602 } 603 RowMutations rm = null; 604 int i = 0; 605 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = 606 ClientProtos.ResultOrException.newBuilder(); 607 for (ClientProtos.Action action: actions) { 608 if (action.hasGet()) { 609 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + 610 action.getGet()); 611 } 612 MutationType type = action.getMutation().getMutateType(); 613 if (rm == null) { 614 rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size()); 615 } 616 switch (type) { 617 case PUT: 618 Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); 619 ++countOfCompleteMutation; 620 checkCellSizeLimit(region, put); 621 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); 622 rm.add(put); 623 break; 624 case DELETE: 625 Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner); 626 ++countOfCompleteMutation; 627 spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); 628 rm.add(del); 629 break; 630 default: 631 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); 632 } 633 // To unify the response format with doNonAtomicRegionMutation and read through client's 634 // AsyncProcess we have to add an empty result instance per operation 635 resultOrExceptionOrBuilder.clear(); 636 resultOrExceptionOrBuilder.setIndex(i++); 637 builder.addResultOrException( 638 resultOrExceptionOrBuilder.build()); 639 } 640 return region.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm); 641 } finally { 642 // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner 643 // even if the malformed cells are not skipped. 644 for (int i = countOfCompleteMutation; i < actions.size(); ++i) { 645 skipCellsForMutation(actions.get(i), cellScanner); 646 } 647 } 648 } 649 650 /** 651 * Execute an append mutation. 652 * 653 * @return result to return to client if default operation should be 654 * bypassed as indicated by RegionObserver, null otherwise 655 */ 656 private Result append(final HRegion region, final OperationQuota quota, 657 final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, 658 ActivePolicyEnforcement spaceQuota) 659 throws IOException { 660 long before = EnvironmentEdgeManager.currentTime(); 661 Append append = ProtobufUtil.toAppend(mutation, cellScanner); 662 checkCellSizeLimit(region, append); 663 spaceQuota.getPolicyEnforcement(region).check(append); 664 quota.addMutation(append); 665 Result r = null; 666 if (region.getCoprocessorHost() != null) { 667 r = region.getCoprocessorHost().preAppend(append); 668 } 669 if (r == null) { 670 boolean canProceed = startNonceOperation(mutation, nonceGroup); 671 boolean success = false; 672 try { 673 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 674 if (canProceed) { 675 r = region.append(append, nonceGroup, nonce); 676 } else { 677 // convert duplicate append to get 678 List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cellScanner), false, 679 nonceGroup, nonce); 680 r = Result.create(results); 681 } 682 success = true; 683 } finally { 684 if (canProceed) { 685 endNonceOperation(mutation, nonceGroup, success); 686 } 687 } 688 if (region.getCoprocessorHost() != null) { 689 r = region.getCoprocessorHost().postAppend(append, r); 690 } 691 } 692 if (regionServer.metricsRegionServer != null) { 693 regionServer.metricsRegionServer.updateAppend( 694 region.getTableDescriptor().getTableName(), 695 EnvironmentEdgeManager.currentTime() - before); 696 } 697 return r == null ? Result.EMPTY_RESULT : r; 698 } 699 700 /** 701 * Execute an increment mutation. 702 * 703 * @param region 704 * @param mutation 705 * @return the Result 706 * @throws IOException 707 */ 708 private Result increment(final HRegion region, final OperationQuota quota, 709 final MutationProto mutation, final CellScanner cells, long nonceGroup, 710 ActivePolicyEnforcement spaceQuota) 711 throws IOException { 712 long before = EnvironmentEdgeManager.currentTime(); 713 Increment increment = ProtobufUtil.toIncrement(mutation, cells); 714 checkCellSizeLimit(region, increment); 715 spaceQuota.getPolicyEnforcement(region).check(increment); 716 quota.addMutation(increment); 717 Result r = null; 718 if (region.getCoprocessorHost() != null) { 719 r = region.getCoprocessorHost().preIncrement(increment); 720 } 721 if (r == null) { 722 boolean canProceed = startNonceOperation(mutation, nonceGroup); 723 boolean success = false; 724 try { 725 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; 726 if (canProceed) { 727 r = region.increment(increment, nonceGroup, nonce); 728 } else { 729 // convert duplicate increment to get 730 List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cells), false, nonceGroup, 731 nonce); 732 r = Result.create(results); 733 } 734 success = true; 735 } finally { 736 if (canProceed) { 737 endNonceOperation(mutation, nonceGroup, success); 738 } 739 } 740 if (region.getCoprocessorHost() != null) { 741 r = region.getCoprocessorHost().postIncrement(increment, r); 742 } 743 } 744 if (regionServer.metricsRegionServer != null) { 745 regionServer.metricsRegionServer.updateIncrement( 746 region.getTableDescriptor().getTableName(), 747 EnvironmentEdgeManager.currentTime() - before); 748 } 749 return r == null ? Result.EMPTY_RESULT : r; 750 } 751 752 /** 753 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when 754 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. 755 * @param cellsToReturn Could be null. May be allocated in this method. This is what this 756 * method returns as a 'result'. 757 * @param closeCallBack the callback to be used with multigets 758 * @param context the current RpcCallContext 759 * @return Return the <code>cellScanner</code> passed 760 */ 761 private List<CellScannable> doNonAtomicRegionMutation(final HRegion region, 762 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, 763 final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup, 764 final RegionScannersCloseCallBack closeCallBack, RpcCallContext context, 765 ActivePolicyEnforcement spaceQuotaEnforcement) { 766 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do 767 // one at a time, we instead pass them in batch. Be aware that the corresponding 768 // ResultOrException instance that matches each Put or Delete is then added down in the 769 // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are 770 // deferred/batched 771 List<ClientProtos.Action> mutations = null; 772 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); 773 IOException sizeIOE = null; 774 Object lastBlock = null; 775 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = ResultOrException.newBuilder(); 776 boolean hasResultOrException = false; 777 for (ClientProtos.Action action : actions.getActionList()) { 778 hasResultOrException = false; 779 resultOrExceptionBuilder.clear(); 780 try { 781 Result r = null; 782 783 if (context != null 784 && context.isRetryImmediatelySupported() 785 && (context.getResponseCellSize() > maxQuotaResultSize 786 || context.getResponseBlockSize() + context.getResponseExceptionSize() 787 > maxQuotaResultSize)) { 788 789 // We're storing the exception since the exception and reason string won't 790 // change after the response size limit is reached. 791 if (sizeIOE == null ) { 792 // We don't need the stack un-winding do don't throw the exception. 793 // Throwing will kill the JVM's JIT. 794 // 795 // Instead just create the exception and then store it. 796 sizeIOE = new MultiActionResultTooLarge("Max size exceeded" 797 + " CellSize: " + context.getResponseCellSize() 798 + " BlockSize: " + context.getResponseBlockSize()); 799 800 // Only report the exception once since there's only one request that 801 // caused the exception. Otherwise this number will dominate the exceptions count. 802 rpcServer.getMetrics().exception(sizeIOE); 803 } 804 805 // Now that there's an exception is known to be created 806 // use it for the response. 807 // 808 // This will create a copy in the builder. 809 NameBytesPair pair = ResponseConverter.buildException(sizeIOE); 810 resultOrExceptionBuilder.setException(pair); 811 context.incrementResponseExceptionSize(pair.getSerializedSize()); 812 resultOrExceptionBuilder.setIndex(action.getIndex()); 813 builder.addResultOrException(resultOrExceptionBuilder.build()); 814 skipCellsForMutation(action, cellScanner); 815 continue; 816 } 817 if (action.hasGet()) { 818 long before = EnvironmentEdgeManager.currentTime(); 819 ClientProtos.Get pbGet = action.getGet(); 820 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 821 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 822 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 823 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 824 if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) { 825 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " + 826 "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " + 827 "reverse Scan."); 828 } 829 try { 830 Get get = ProtobufUtil.toGet(pbGet); 831 if (context != null) { 832 r = get(get, (region), closeCallBack, context); 833 } else { 834 r = region.get(get); 835 } 836 } finally { 837 if (regionServer.metricsRegionServer != null) { 838 regionServer.metricsRegionServer.updateGet( 839 region.getTableDescriptor().getTableName(), 840 EnvironmentEdgeManager.currentTime() - before); 841 } 842 } 843 } else if (action.hasServiceCall()) { 844 hasResultOrException = true; 845 com.google.protobuf.Message result = 846 execServiceOnRegion(region, action.getServiceCall()); 847 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = 848 ClientProtos.CoprocessorServiceResult.newBuilder(); 849 resultOrExceptionBuilder.setServiceResult( 850 serviceResultBuilder.setValue( 851 serviceResultBuilder.getValueBuilder() 852 .setName(result.getClass().getName()) 853 // TODO: Copy!!! 854 .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray())))); 855 } else if (action.hasMutation()) { 856 MutationType type = action.getMutation().getMutateType(); 857 if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null && 858 !mutations.isEmpty()) { 859 // Flush out any Puts or Deletes already collected. 860 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, 861 spaceQuotaEnforcement); 862 mutations.clear(); 863 } 864 switch (type) { 865 case APPEND: 866 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup, 867 spaceQuotaEnforcement); 868 break; 869 case INCREMENT: 870 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup, 871 spaceQuotaEnforcement); 872 break; 873 case PUT: 874 case DELETE: 875 // Collect the individual mutations and apply in a batch 876 if (mutations == null) { 877 mutations = new ArrayList<>(actions.getActionCount()); 878 } 879 mutations.add(action); 880 break; 881 default: 882 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 883 } 884 } else { 885 throw new HBaseIOException("Unexpected Action type"); 886 } 887 if (r != null) { 888 ClientProtos.Result pbResult = null; 889 if (isClientCellBlockSupport(context)) { 890 pbResult = ProtobufUtil.toResultNoData(r); 891 // Hard to guess the size here. Just make a rough guess. 892 if (cellsToReturn == null) { 893 cellsToReturn = new ArrayList<>(); 894 } 895 cellsToReturn.add(r); 896 } else { 897 pbResult = ProtobufUtil.toResult(r); 898 } 899 lastBlock = addSize(context, r, lastBlock); 900 hasResultOrException = true; 901 resultOrExceptionBuilder.setResult(pbResult); 902 } 903 // Could get to here and there was no result and no exception. Presumes we added 904 // a Put or Delete to the collecting Mutations List for adding later. In this 905 // case the corresponding ResultOrException instance for the Put or Delete will be added 906 // down in the doNonAtomicBatchOp method call rather than up here. 907 } catch (IOException ie) { 908 rpcServer.getMetrics().exception(ie); 909 hasResultOrException = true; 910 NameBytesPair pair = ResponseConverter.buildException(ie); 911 resultOrExceptionBuilder.setException(pair); 912 context.incrementResponseExceptionSize(pair.getSerializedSize()); 913 } 914 if (hasResultOrException) { 915 // Propagate index. 916 resultOrExceptionBuilder.setIndex(action.getIndex()); 917 builder.addResultOrException(resultOrExceptionBuilder.build()); 918 } 919 } 920 // Finish up any outstanding mutations 921 if (!CollectionUtils.isEmpty(mutations)) { 922 doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); 923 } 924 return cellsToReturn; 925 } 926 927 private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException { 928 if (r.maxCellSize > 0) { 929 CellScanner cells = m.cellScanner(); 930 while (cells.advance()) { 931 int size = PrivateCellUtil.estimatedSerializedSizeOf(cells.current()); 932 if (size > r.maxCellSize) { 933 String msg = "Cell with size " + size + " exceeds limit of " + r.maxCellSize + " bytes"; 934 if (LOG.isDebugEnabled()) { 935 LOG.debug(msg); 936 } 937 throw new DoNotRetryIOException(msg); 938 } 939 } 940 } 941 } 942 943 private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 944 final OperationQuota quota, final List<ClientProtos.Action> mutations, 945 final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) 946 throws IOException { 947 // Just throw the exception. The exception will be caught and then added to region-level 948 // exception for RegionAction. Leaving the null to action result is ok since the null 949 // result is viewed as failure by hbase client. And the region-lever exception will be used 950 // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and 951 // AsyncBatchRpcRetryingCaller#onComplete for more details. 952 doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, true); 953 } 954 955 private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region, 956 final OperationQuota quota, final List<ClientProtos.Action> mutations, 957 final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) { 958 try { 959 doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, false); 960 } catch (IOException e) { 961 // Set the exception for each action. The mutations in same RegionAction are group to 962 // different batch and then be processed individually. Hence, we don't set the region-level 963 // exception here for whole RegionAction. 964 for (Action mutation : mutations) { 965 builder.addResultOrException(getResultOrException(e, mutation.getIndex())); 966 } 967 } 968 } 969 970 /** 971 * Execute a list of Put/Delete mutations. 972 * 973 * @param builder 974 * @param region 975 * @param mutations 976 */ 977 private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, 978 final OperationQuota quota, final List<ClientProtos.Action> mutations, 979 final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic) 980 throws IOException { 981 Mutation[] mArray = new Mutation[mutations.size()]; 982 long before = EnvironmentEdgeManager.currentTime(); 983 boolean batchContainsPuts = false, batchContainsDelete = false; 984 try { 985 /** HBASE-17924 986 * mutationActionMap is a map to map the relation between mutations and actions 987 * since mutation array may have been reoredered.In order to return the right 988 * result or exception to the corresponding actions, We need to know which action 989 * is the mutation belong to. We can't sort ClientProtos.Action array, since they 990 * are bonded to cellscanners. 991 */ 992 Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>(); 993 int i = 0; 994 for (ClientProtos.Action action: mutations) { 995 if (action.hasGet()) { 996 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + 997 action.getGet()); 998 } 999 MutationProto m = action.getMutation(); 1000 Mutation mutation; 1001 if (m.getMutateType() == MutationType.PUT) { 1002 mutation = ProtobufUtil.toPut(m, cells); 1003 batchContainsPuts = true; 1004 } else { 1005 mutation = ProtobufUtil.toDelete(m, cells); 1006 batchContainsDelete = true; 1007 } 1008 mutationActionMap.put(mutation, action); 1009 mArray[i++] = mutation; 1010 checkCellSizeLimit(region, mutation); 1011 // Check if a space quota disallows this mutation 1012 spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation); 1013 quota.addMutation(mutation); 1014 } 1015 1016 if (!region.getRegionInfo().isMetaRegion()) { 1017 regionServer.cacheFlusher.reclaimMemStoreMemory(); 1018 } 1019 1020 // HBASE-17924 1021 // Sort to improve lock efficiency for non-atomic batch of operations. If atomic 1022 // order is preserved as its expected from the client 1023 if (!atomic) { 1024 Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2)); 1025 } 1026 1027 OperationStatus[] codes = region.batchMutate(mArray, atomic, HConstants.NO_NONCE, 1028 HConstants.NO_NONCE); 1029 for (i = 0; i < codes.length; i++) { 1030 Mutation currentMutation = mArray[i]; 1031 ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); 1032 int index = currentAction.hasIndex() || !atomic ? currentAction.getIndex() : i; 1033 Exception e = null; 1034 switch (codes[i].getOperationStatusCode()) { 1035 case BAD_FAMILY: 1036 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg()); 1037 builder.addResultOrException(getResultOrException(e, index)); 1038 break; 1039 1040 case SANITY_CHECK_FAILURE: 1041 e = new FailedSanityCheckException(codes[i].getExceptionMsg()); 1042 builder.addResultOrException(getResultOrException(e, index)); 1043 break; 1044 1045 default: 1046 e = new DoNotRetryIOException(codes[i].getExceptionMsg()); 1047 builder.addResultOrException(getResultOrException(e, index)); 1048 break; 1049 1050 case SUCCESS: 1051 builder.addResultOrException(getResultOrException( 1052 ClientProtos.Result.getDefaultInstance(), index)); 1053 break; 1054 } 1055 } 1056 } finally { 1057 int processedMutationIndex = 0; 1058 for (Action mutation : mutations) { 1059 // The non-null mArray[i] means the cell scanner has been read. 1060 if (mArray[processedMutationIndex++] == null) { 1061 skipCellsForMutation(mutation, cells); 1062 } 1063 } 1064 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1065 } 1066 } 1067 1068 private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts, 1069 boolean batchContainsDelete) { 1070 if (regionServer.metricsRegionServer != null) { 1071 long after = EnvironmentEdgeManager.currentTime(); 1072 if (batchContainsPuts) { 1073 regionServer.metricsRegionServer 1074 .updatePutBatch(region.getTableDescriptor().getTableName(), after - starttime); 1075 } 1076 if (batchContainsDelete) { 1077 regionServer.metricsRegionServer 1078 .updateDeleteBatch(region.getTableDescriptor().getTableName(), after - starttime); 1079 } 1080 } 1081 } 1082 1083 /** 1084 * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of 1085 * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. 1086 * @param region 1087 * @param mutations 1088 * @param replaySeqId 1089 * @return an array of OperationStatus which internally contains the OperationStatusCode and the 1090 * exceptionMessage if any 1091 * @throws IOException 1092 */ 1093 private OperationStatus [] doReplayBatchOp(final HRegion region, 1094 final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException { 1095 long before = EnvironmentEdgeManager.currentTime(); 1096 boolean batchContainsPuts = false, batchContainsDelete = false; 1097 try { 1098 for (Iterator<WALSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) { 1099 WALSplitter.MutationReplay m = it.next(); 1100 1101 if (m.type == MutationType.PUT) { 1102 batchContainsPuts = true; 1103 } else { 1104 batchContainsDelete = true; 1105 } 1106 1107 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap(); 1108 List<Cell> metaCells = map.get(WALEdit.METAFAMILY); 1109 if (metaCells != null && !metaCells.isEmpty()) { 1110 for (Cell metaCell : metaCells) { 1111 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); 1112 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 1113 HRegion hRegion = region; 1114 if (compactionDesc != null) { 1115 // replay the compaction. Remove the files from stores only if we are the primary 1116 // region replica (thus own the files) 1117 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, 1118 replaySeqId); 1119 continue; 1120 } 1121 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); 1122 if (flushDesc != null && !isDefaultReplica) { 1123 hRegion.replayWALFlushMarker(flushDesc, replaySeqId); 1124 continue; 1125 } 1126 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); 1127 if (regionEvent != null && !isDefaultReplica) { 1128 hRegion.replayWALRegionEventMarker(regionEvent); 1129 continue; 1130 } 1131 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell); 1132 if (bulkLoadEvent != null) { 1133 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent); 1134 continue; 1135 } 1136 } 1137 it.remove(); 1138 } 1139 } 1140 requestCount.increment(); 1141 if (!region.getRegionInfo().isMetaRegion()) { 1142 regionServer.cacheFlusher.reclaimMemStoreMemory(); 1143 } 1144 return region.batchReplay(mutations.toArray( 1145 new WALSplitter.MutationReplay[mutations.size()]), replaySeqId); 1146 } finally { 1147 updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); 1148 } 1149 } 1150 1151 private void closeAllScanners() { 1152 // Close any outstanding scanners. Means they'll get an UnknownScanner 1153 // exception next time they come in. 1154 for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) { 1155 try { 1156 e.getValue().s.close(); 1157 } catch (IOException ioe) { 1158 LOG.warn("Closing scanner " + e.getKey(), ioe); 1159 } 1160 } 1161 } 1162 1163 // Exposed for testing 1164 interface LogDelegate { 1165 void logBatchWarning(String firstRegionName, int sum, int rowSizeWarnThreshold); 1166 } 1167 1168 private static LogDelegate DEFAULT_LOG_DELEGATE = new LogDelegate() { 1169 @Override 1170 public void logBatchWarning(String firstRegionName, int sum, int rowSizeWarnThreshold) { 1171 if (LOG.isWarnEnabled()) { 1172 LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold 1173 + ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: " 1174 + RpcServer.getRequestUserName().orElse(null) + "/" 1175 + RpcServer.getRemoteAddress().orElse(null) 1176 + " first region in multi=" + firstRegionName); 1177 } 1178 } 1179 }; 1180 1181 private final LogDelegate ld; 1182 1183 public RSRpcServices(HRegionServer rs) throws IOException { 1184 this(rs, DEFAULT_LOG_DELEGATE); 1185 } 1186 1187 // Directly invoked only for testing 1188 RSRpcServices(HRegionServer rs, LogDelegate ld) throws IOException { 1189 this.ld = ld; 1190 regionServer = rs; 1191 rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT); 1192 RpcSchedulerFactory rpcSchedulerFactory; 1193 try { 1194 Class<?> cls = rs.conf.getClass( 1195 REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 1196 SimpleRpcSchedulerFactory.class); 1197 rpcSchedulerFactory = cls.asSubclass(RpcSchedulerFactory.class) 1198 .getDeclaredConstructor().newInstance(); 1199 } catch (NoSuchMethodException | InvocationTargetException | 1200 InstantiationException | IllegalAccessException e) { 1201 throw new IllegalArgumentException(e); 1202 } 1203 // Server to handle client requests. 1204 InetSocketAddress initialIsa; 1205 InetSocketAddress bindAddress; 1206 if(this instanceof MasterRpcServices) { 1207 String hostname = getHostname(rs.conf, true); 1208 int port = rs.conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT); 1209 // Creation of a HSA will force a resolve. 1210 initialIsa = new InetSocketAddress(hostname, port); 1211 bindAddress = new InetSocketAddress(rs.conf.get("hbase.master.ipc.address", hostname), port); 1212 } else { 1213 String hostname = getHostname(rs.conf, false); 1214 int port = rs.conf.getInt(HConstants.REGIONSERVER_PORT, 1215 HConstants.DEFAULT_REGIONSERVER_PORT); 1216 // Creation of a HSA will force a resolve. 1217 initialIsa = new InetSocketAddress(hostname, port); 1218 bindAddress = new InetSocketAddress( 1219 rs.conf.get("hbase.regionserver.ipc.address", hostname), port); 1220 } 1221 if (initialIsa.getAddress() == null) { 1222 throw new IllegalArgumentException("Failed resolve of " + initialIsa); 1223 } 1224 priority = createPriority(); 1225 // Using Address means we don't get the IP too. Shorten it more even to just the host name 1226 // w/o the domain. 1227 String name = rs.getProcessName() + "/" + 1228 Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain(); 1229 // Set how many times to retry talking to another server over Connection. 1230 ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG); 1231 rpcServer = createRpcServer(rs, rs.conf, rpcSchedulerFactory, bindAddress, name); 1232 rpcServer.setRsRpcServices(this); 1233 scannerLeaseTimeoutPeriod = rs.conf.getInt( 1234 HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1235 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 1236 maxScannerResultSize = rs.conf.getLong( 1237 HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 1238 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); 1239 rpcTimeout = rs.conf.getInt( 1240 HConstants.HBASE_RPC_TIMEOUT_KEY, 1241 HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 1242 minimumScanTimeLimitDelta = rs.conf.getLong( 1243 REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, 1244 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); 1245 1246 InetSocketAddress address = rpcServer.getListenerAddress(); 1247 if (address == null) { 1248 throw new IOException("Listener channel is closed"); 1249 } 1250 // Set our address, however we need the final port that was given to rpcServer 1251 isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort()); 1252 rpcServer.setErrorHandler(this); 1253 rs.setName(name); 1254 1255 closedScanners = CacheBuilder.newBuilder() 1256 .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build(); 1257 } 1258 1259 protected RpcServerInterface createRpcServer(Server server, Configuration conf, 1260 RpcSchedulerFactory rpcSchedulerFactory, InetSocketAddress bindAddress, String name) 1261 throws IOException { 1262 boolean reservoirEnabled = conf.getBoolean(RESERVOIR_ENABLED_KEY, true); 1263 try { 1264 return RpcServerFactory.createRpcServer(server, name, getServices(), 1265 bindAddress, // use final bindAddress for this server. 1266 conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled); 1267 } catch (BindException be) { 1268 throw new IOException(be.getMessage() + ". To switch ports use the '" 1269 + HConstants.REGIONSERVER_PORT + "' configuration property.", 1270 be.getCause() != null ? be.getCause() : be); 1271 } 1272 } 1273 1274 @Override 1275 public void onConfigurationChange(Configuration newConf) { 1276 if (rpcServer instanceof ConfigurationObserver) { 1277 ((ConfigurationObserver)rpcServer).onConfigurationChange(newConf); 1278 } 1279 } 1280 1281 protected PriorityFunction createPriority() { 1282 return new AnnotationReadingPriorityFunction(this); 1283 } 1284 1285 protected void requirePermission(String request, Permission.Action perm) throws IOException { 1286 if (accessChecker != null) { 1287 accessChecker.requirePermission(RpcServer.getRequestUser().orElse(null), request, perm); 1288 } 1289 } 1290 1291 1292 public static String getHostname(Configuration conf, boolean isMaster) 1293 throws UnknownHostException { 1294 String hostname = conf.get(isMaster? HRegionServer.MASTER_HOSTNAME_KEY : 1295 HRegionServer.RS_HOSTNAME_KEY); 1296 if (hostname == null || hostname.isEmpty()) { 1297 String masterOrRS = isMaster ? "master" : "regionserver"; 1298 return Strings.domainNamePointerToHostName(DNS.getDefaultHost( 1299 conf.get("hbase." + masterOrRS + ".dns.interface", "default"), 1300 conf.get("hbase." + masterOrRS + ".dns.nameserver", "default"))); 1301 } else { 1302 LOG.info("hostname is configured to be " + hostname); 1303 return hostname; 1304 } 1305 } 1306 1307 @VisibleForTesting 1308 public int getScannersCount() { 1309 return scanners.size(); 1310 } 1311 1312 public 1313 RegionScanner getScanner(long scannerId) { 1314 String scannerIdString = Long.toString(scannerId); 1315 RegionScannerHolder scannerHolder = scanners.get(scannerIdString); 1316 if (scannerHolder != null) { 1317 return scannerHolder.s; 1318 } 1319 return null; 1320 } 1321 1322 public String getScanDetailsWithId(long scannerId) { 1323 RegionScanner scanner = getScanner(scannerId); 1324 if (scanner == null) { 1325 return null; 1326 } 1327 StringBuilder builder = new StringBuilder(); 1328 builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString()); 1329 builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString()); 1330 return builder.toString(); 1331 } 1332 1333 /** 1334 * Get the vtime associated with the scanner. 1335 * Currently the vtime is the number of "next" calls. 1336 */ 1337 long getScannerVirtualTime(long scannerId) { 1338 String scannerIdString = Long.toString(scannerId); 1339 RegionScannerHolder scannerHolder = scanners.get(scannerIdString); 1340 if (scannerHolder != null) { 1341 return scannerHolder.getNextCallSeq(); 1342 } 1343 return 0L; 1344 } 1345 1346 /** 1347 * Method to account for the size of retained cells and retained data blocks. 1348 * @return an object that represents the last referenced block from this response. 1349 */ 1350 Object addSize(RpcCallContext context, Result r, Object lastBlock) { 1351 if (context != null && r != null && !r.isEmpty()) { 1352 for (Cell c : r.rawCells()) { 1353 context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c)); 1354 1355 // Since byte buffers can point all kinds of crazy places it's harder to keep track 1356 // of which blocks are kept alive by what byte buffer. 1357 // So we make a guess. 1358 if (c instanceof ByteBufferExtendedCell) { 1359 ByteBufferExtendedCell bbCell = (ByteBufferExtendedCell) c; 1360 ByteBuffer bb = bbCell.getValueByteBuffer(); 1361 if (bb != lastBlock) { 1362 context.incrementResponseBlockSize(bb.capacity()); 1363 lastBlock = bb; 1364 } 1365 } else { 1366 // We're using the last block being the same as the current block as 1367 // a proxy for pointing to a new block. This won't be exact. 1368 // If there are multiple gets that bounce back and forth 1369 // Then it's possible that this will over count the size of 1370 // referenced blocks. However it's better to over count and 1371 // use two rpcs than to OOME the regionserver. 1372 byte[] valueArray = c.getValueArray(); 1373 if (valueArray != lastBlock) { 1374 context.incrementResponseBlockSize(valueArray.length); 1375 lastBlock = valueArray; 1376 } 1377 } 1378 1379 } 1380 } 1381 return lastBlock; 1382 } 1383 1384 private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper, 1385 HRegion r, boolean needCursor) throws LeaseStillHeldException { 1386 Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod, 1387 new ScannerListener(scannerName)); 1388 RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease); 1389 RpcCallback closeCallback; 1390 if (s instanceof RpcCallback) { 1391 closeCallback = (RpcCallback) s; 1392 } else { 1393 closeCallback = new RegionScannerCloseCallBack(s); 1394 } 1395 RegionScannerHolder rsh = 1396 new RegionScannerHolder(scannerName, s, r, closeCallback, shippedCallback, needCursor); 1397 RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); 1398 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " + 1399 scannerName; 1400 return rsh; 1401 } 1402 1403 /** 1404 * Find the HRegion based on a region specifier 1405 * 1406 * @param regionSpecifier the region specifier 1407 * @return the corresponding region 1408 * @throws IOException if the specifier is not null, 1409 * but failed to find the region 1410 */ 1411 @VisibleForTesting 1412 public HRegion getRegion( 1413 final RegionSpecifier regionSpecifier) throws IOException { 1414 return regionServer.getRegion(regionSpecifier.getValue().toByteArray()); 1415 } 1416 1417 /** 1418 * Find the List of HRegions based on a list of region specifiers 1419 * 1420 * @param regionSpecifiers the list of region specifiers 1421 * @return the corresponding list of regions 1422 * @throws IOException if any of the specifiers is not null, 1423 * but failed to find the region 1424 */ 1425 private List<HRegion> getRegions(final List<RegionSpecifier> regionSpecifiers, 1426 final CacheEvictionStatsBuilder stats) { 1427 List<HRegion> regions = Lists.newArrayListWithCapacity(regionSpecifiers.size()); 1428 for (RegionSpecifier regionSpecifier: regionSpecifiers) { 1429 try { 1430 regions.add(regionServer.getRegion(regionSpecifier.getValue().toByteArray())); 1431 } catch (NotServingRegionException e) { 1432 stats.addException(regionSpecifier.getValue().toByteArray(), e); 1433 } 1434 } 1435 return regions; 1436 } 1437 1438 @VisibleForTesting 1439 public PriorityFunction getPriority() { 1440 return priority; 1441 } 1442 1443 @VisibleForTesting 1444 public Configuration getConfiguration() { 1445 return regionServer.getConfiguration(); 1446 } 1447 1448 private RegionServerRpcQuotaManager getRpcQuotaManager() { 1449 return regionServer.getRegionServerRpcQuotaManager(); 1450 } 1451 1452 private RegionServerSpaceQuotaManager getSpaceQuotaManager() { 1453 return regionServer.getRegionServerSpaceQuotaManager(); 1454 } 1455 1456 void start(ZKWatcher zkWatcher) { 1457 if (AccessChecker.isAuthorizationSupported(getConfiguration())) { 1458 accessChecker = new AccessChecker(getConfiguration(), zkWatcher); 1459 } 1460 this.scannerIdGenerator = new ScannerIdGenerator(this.regionServer.serverName); 1461 rpcServer.start(); 1462 } 1463 1464 void stop() { 1465 if (accessChecker != null) { 1466 accessChecker.stop(); 1467 } 1468 closeAllScanners(); 1469 rpcServer.stop(); 1470 } 1471 1472 /** 1473 * Called to verify that this server is up and running. 1474 */ 1475 // TODO : Rename this and HMaster#checkInitialized to isRunning() (or a better name). 1476 protected void checkOpen() throws IOException { 1477 if (regionServer.isAborted()) { 1478 throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting"); 1479 } 1480 if (regionServer.isStopped()) { 1481 throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping"); 1482 } 1483 if (!regionServer.fsOk) { 1484 throw new RegionServerStoppedException("File system not available"); 1485 } 1486 if (!regionServer.isOnline()) { 1487 throw new ServerNotRunningYetException("Server " + regionServer.serverName 1488 + " is not running yet"); 1489 } 1490 } 1491 1492 /** 1493 * By default, put up an Admin and a Client Service. 1494 * Set booleans <code>hbase.regionserver.admin.executorService</code> and 1495 * <code>hbase.regionserver.client.executorService</code> if you want to enable/disable services. 1496 * Default is that both are enabled. 1497 * @return immutable list of blocking services and the security info classes that this server 1498 * supports 1499 */ 1500 protected List<BlockingServiceAndInterface> getServices() { 1501 boolean admin = 1502 getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true); 1503 boolean client = 1504 getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true); 1505 List<BlockingServiceAndInterface> bssi = new ArrayList<>(); 1506 if (client) { 1507 bssi.add(new BlockingServiceAndInterface( 1508 ClientService.newReflectiveBlockingService(this), 1509 ClientService.BlockingInterface.class)); 1510 } 1511 if (admin) { 1512 bssi.add(new BlockingServiceAndInterface( 1513 AdminService.newReflectiveBlockingService(this), 1514 AdminService.BlockingInterface.class)); 1515 } 1516 return new org.apache.hbase.thirdparty.com.google.common.collect. 1517 ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build(); 1518 } 1519 1520 public InetSocketAddress getSocketAddress() { 1521 return isa; 1522 } 1523 1524 @Override 1525 public int getPriority(RequestHeader header, Message param, User user) { 1526 return priority.getPriority(header, param, user); 1527 } 1528 1529 @Override 1530 public long getDeadline(RequestHeader header, Message param) { 1531 return priority.getDeadline(header, param); 1532 } 1533 1534 /* 1535 * Check if an OOME and, if so, abort immediately to avoid creating more objects. 1536 * 1537 * @param e 1538 * 1539 * @return True if we OOME'd and are aborting. 1540 */ 1541 @Override 1542 public boolean checkOOME(final Throwable e) { 1543 return exitIfOOME(e); 1544 } 1545 1546 public static boolean exitIfOOME(final Throwable e ){ 1547 boolean stop = false; 1548 try { 1549 if (e instanceof OutOfMemoryError 1550 || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError) 1551 || (e.getMessage() != null && e.getMessage().contains( 1552 "java.lang.OutOfMemoryError"))) { 1553 stop = true; 1554 LOG.error(HBaseMarkers.FATAL, "Run out of memory; " 1555 + RSRpcServices.class.getSimpleName() + " will abort itself immediately", 1556 e); 1557 } 1558 } finally { 1559 if (stop) { 1560 Runtime.getRuntime().halt(1); 1561 } 1562 } 1563 return stop; 1564 } 1565 1566 /** 1567 * Close a region on the region server. 1568 * 1569 * @param controller the RPC controller 1570 * @param request the request 1571 * @throws ServiceException 1572 */ 1573 @Override 1574 @QosPriority(priority=HConstants.ADMIN_QOS) 1575 public CloseRegionResponse closeRegion(final RpcController controller, 1576 final CloseRegionRequest request) throws ServiceException { 1577 final ServerName sn = (request.hasDestinationServer() ? 1578 ProtobufUtil.toServerName(request.getDestinationServer()) : null); 1579 1580 try { 1581 checkOpen(); 1582 if (request.hasServerStartCode()) { 1583 // check that we are the same server that this RPC is intended for. 1584 long serverStartCode = request.getServerStartCode(); 1585 if (regionServer.serverName.getStartcode() != serverStartCode) { 1586 throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " + 1587 "different server with startCode: " + serverStartCode + ", this server is: " 1588 + regionServer.serverName)); 1589 } 1590 } 1591 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion()); 1592 1593 requestCount.increment(); 1594 if (sn == null) { 1595 LOG.info("Close " + encodedRegionName + " without moving"); 1596 } else { 1597 LOG.info("Close " + encodedRegionName + ", moving to " + sn); 1598 } 1599 boolean closed = regionServer.closeRegion(encodedRegionName, false, sn); 1600 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed); 1601 return builder.build(); 1602 } catch (IOException ie) { 1603 throw new ServiceException(ie); 1604 } 1605 } 1606 1607 /** 1608 * Compact a region on the region server. 1609 * 1610 * @param controller the RPC controller 1611 * @param request the request 1612 * @throws ServiceException 1613 */ 1614 @Override 1615 @QosPriority(priority = HConstants.ADMIN_QOS) 1616 public CompactRegionResponse compactRegion(final RpcController controller, 1617 final CompactRegionRequest request) throws ServiceException { 1618 try { 1619 checkOpen(); 1620 requestCount.increment(); 1621 HRegion region = getRegion(request.getRegion()); 1622 // Quota support is enabled, the requesting user is not system/super user 1623 // and a quota policy is enforced that disables compactions. 1624 if (QuotaUtil.isQuotaEnabled(getConfiguration()) && 1625 !Superusers.isSuperUser(RpcServer.getRequestUser().orElse(null)) && 1626 this.regionServer.getRegionServerSpaceQuotaManager() 1627 .areCompactionsDisabled(region.getTableDescriptor().getTableName())) { 1628 throw new DoNotRetryIOException( 1629 "Compactions on this region are " + "disabled due to a space quota violation."); 1630 } 1631 region.startRegionOperation(Operation.COMPACT_REGION); 1632 LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString()); 1633 boolean major = request.hasMajor() && request.getMajor(); 1634 if (request.hasFamily()) { 1635 byte[] family = request.getFamily().toByteArray(); 1636 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " + 1637 region.getRegionInfo().getRegionNameAsString() + " and family " + 1638 Bytes.toString(family); 1639 LOG.trace(log); 1640 region.requestCompaction(family, log, Store.PRIORITY_USER, major, 1641 CompactionLifeCycleTracker.DUMMY); 1642 } else { 1643 String log = "User-triggered " + (major ? "major " : "") + "compaction for region " + 1644 region.getRegionInfo().getRegionNameAsString(); 1645 LOG.trace(log); 1646 region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY); 1647 } 1648 return CompactRegionResponse.newBuilder().build(); 1649 } catch (IOException ie) { 1650 throw new ServiceException(ie); 1651 } 1652 } 1653 1654 /** 1655 * Flush a region on the region server. 1656 * 1657 * @param controller the RPC controller 1658 * @param request the request 1659 * @throws ServiceException 1660 */ 1661 @Override 1662 @QosPriority(priority=HConstants.ADMIN_QOS) 1663 public FlushRegionResponse flushRegion(final RpcController controller, 1664 final FlushRegionRequest request) throws ServiceException { 1665 try { 1666 checkOpen(); 1667 requestCount.increment(); 1668 HRegion region = getRegion(request.getRegion()); 1669 LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString()); 1670 boolean shouldFlush = true; 1671 if (request.hasIfOlderThanTs()) { 1672 shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs(); 1673 } 1674 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); 1675 if (shouldFlush) { 1676 boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ? 1677 request.getWriteFlushWalMarker() : false; 1678 // Go behind the curtain so we can manage writing of the flush WAL marker 1679 HRegion.FlushResultImpl flushResult = 1680 region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); 1681 boolean compactionNeeded = flushResult.isCompactionNeeded(); 1682 if (compactionNeeded) { 1683 regionServer.compactSplitThread.requestSystemCompaction(region, 1684 "Compaction through user triggered flush"); 1685 } 1686 builder.setFlushed(flushResult.isFlushSucceeded()); 1687 builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker); 1688 } 1689 builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores()); 1690 return builder.build(); 1691 } catch (DroppedSnapshotException ex) { 1692 // Cache flush can fail in a few places. If it fails in a critical 1693 // section, we get a DroppedSnapshotException and a replay of wal 1694 // is required. Currently the only way to do this is a restart of 1695 // the server. 1696 regionServer.abort("Replay of WAL required. Forcing server shutdown", ex); 1697 throw new ServiceException(ex); 1698 } catch (IOException ie) { 1699 throw new ServiceException(ie); 1700 } 1701 } 1702 1703 @Override 1704 @QosPriority(priority=HConstants.ADMIN_QOS) 1705 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller, 1706 final GetOnlineRegionRequest request) throws ServiceException { 1707 try { 1708 checkOpen(); 1709 requestCount.increment(); 1710 Map<String, HRegion> onlineRegions = regionServer.onlineRegions; 1711 List<RegionInfo> list = new ArrayList<>(onlineRegions.size()); 1712 for (HRegion region: onlineRegions.values()) { 1713 list.add(region.getRegionInfo()); 1714 } 1715 Collections.sort(list, RegionInfo.COMPARATOR); 1716 return ResponseConverter.buildGetOnlineRegionResponse(list); 1717 } catch (IOException ie) { 1718 throw new ServiceException(ie); 1719 } 1720 } 1721 1722 @Override 1723 @QosPriority(priority=HConstants.ADMIN_QOS) 1724 public GetRegionInfoResponse getRegionInfo(final RpcController controller, 1725 final GetRegionInfoRequest request) throws ServiceException { 1726 try { 1727 checkOpen(); 1728 requestCount.increment(); 1729 HRegion region = getRegion(request.getRegion()); 1730 RegionInfo info = region.getRegionInfo(); 1731 byte[] bestSplitRow = null; 1732 boolean shouldSplit = true; 1733 if (request.hasBestSplitRow() && request.getBestSplitRow()) { 1734 HRegion r = region; 1735 region.startRegionOperation(Operation.SPLIT_REGION); 1736 r.forceSplit(null); 1737 // Even after setting force split if split policy says no to split then we should not split. 1738 shouldSplit = region.getSplitPolicy().shouldSplit() && !info.isMetaRegion(); 1739 bestSplitRow = r.checkSplit(); 1740 // when all table data are in memstore, bestSplitRow = null 1741 // try to flush region first 1742 if(bestSplitRow == null) { 1743 r.flush(true); 1744 bestSplitRow = r.checkSplit(); 1745 } 1746 r.clearSplit(); 1747 } 1748 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); 1749 builder.setRegionInfo(ProtobufUtil.toRegionInfo(info)); 1750 if (request.hasCompactionState() && request.getCompactionState()) { 1751 builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState())); 1752 } 1753 builder.setSplittable(region.isSplittable() && shouldSplit); 1754 builder.setMergeable(region.isMergeable()); 1755 if (request.hasBestSplitRow() && request.getBestSplitRow() && bestSplitRow != null) { 1756 builder.setBestSplitRow(UnsafeByteOperations.unsafeWrap(bestSplitRow)); 1757 } 1758 return builder.build(); 1759 } catch (IOException ie) { 1760 throw new ServiceException(ie); 1761 } 1762 } 1763 1764 @Override 1765 @QosPriority(priority=HConstants.ADMIN_QOS) 1766 public GetRegionLoadResponse getRegionLoad(RpcController controller, 1767 GetRegionLoadRequest request) throws ServiceException { 1768 1769 List<HRegion> regions; 1770 if (request.hasTableName()) { 1771 TableName tableName = ProtobufUtil.toTableName(request.getTableName()); 1772 regions = regionServer.getRegions(tableName); 1773 } else { 1774 regions = regionServer.getRegions(); 1775 } 1776 List<RegionLoad> rLoads = new ArrayList<>(regions.size()); 1777 RegionLoad.Builder regionLoadBuilder = ClusterStatusProtos.RegionLoad.newBuilder(); 1778 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1779 1780 try { 1781 for (HRegion region : regions) { 1782 rLoads.add(regionServer.createRegionLoad(region, regionLoadBuilder, regionSpecifier)); 1783 } 1784 } catch (IOException e) { 1785 throw new ServiceException(e); 1786 } 1787 GetRegionLoadResponse.Builder builder = GetRegionLoadResponse.newBuilder(); 1788 builder.addAllRegionLoads(rLoads); 1789 return builder.build(); 1790 } 1791 1792 @Override 1793 @QosPriority(priority=HConstants.ADMIN_QOS) 1794 public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, 1795 ClearCompactionQueuesRequest request) throws ServiceException { 1796 LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/" 1797 + RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue"); 1798 ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder(); 1799 requestCount.increment(); 1800 if (clearCompactionQueues.compareAndSet(false,true)) { 1801 try { 1802 checkOpen(); 1803 regionServer.getRegionServerCoprocessorHost().preClearCompactionQueues(); 1804 for (String queueName : request.getQueueNameList()) { 1805 LOG.debug("clear " + queueName + " compaction queue"); 1806 switch (queueName) { 1807 case "long": 1808 regionServer.compactSplitThread.clearLongCompactionsQueue(); 1809 break; 1810 case "short": 1811 regionServer.compactSplitThread.clearShortCompactionsQueue(); 1812 break; 1813 default: 1814 LOG.warn("Unknown queue name " + queueName); 1815 throw new IOException("Unknown queue name " + queueName); 1816 } 1817 } 1818 regionServer.getRegionServerCoprocessorHost().postClearCompactionQueues(); 1819 } catch (IOException ie) { 1820 throw new ServiceException(ie); 1821 } finally { 1822 clearCompactionQueues.set(false); 1823 } 1824 } else { 1825 LOG.warn("Clear compactions queue is executing by other admin."); 1826 } 1827 return respBuilder.build(); 1828 } 1829 1830 /** 1831 * Get some information of the region server. 1832 * 1833 * @param controller the RPC controller 1834 * @param request the request 1835 * @throws ServiceException 1836 */ 1837 @Override 1838 @QosPriority(priority=HConstants.ADMIN_QOS) 1839 public GetServerInfoResponse getServerInfo(final RpcController controller, 1840 final GetServerInfoRequest request) throws ServiceException { 1841 try { 1842 checkOpen(); 1843 } catch (IOException ie) { 1844 throw new ServiceException(ie); 1845 } 1846 requestCount.increment(); 1847 int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1; 1848 return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort); 1849 } 1850 1851 @Override 1852 @QosPriority(priority=HConstants.ADMIN_QOS) 1853 public GetStoreFileResponse getStoreFile(final RpcController controller, 1854 final GetStoreFileRequest request) throws ServiceException { 1855 try { 1856 checkOpen(); 1857 HRegion region = getRegion(request.getRegion()); 1858 requestCount.increment(); 1859 Set<byte[]> columnFamilies; 1860 if (request.getFamilyCount() == 0) { 1861 columnFamilies = region.getTableDescriptor().getColumnFamilyNames(); 1862 } else { 1863 columnFamilies = new TreeSet<>(Bytes.BYTES_RAWCOMPARATOR); 1864 for (ByteString cf: request.getFamilyList()) { 1865 columnFamilies.add(cf.toByteArray()); 1866 } 1867 } 1868 int nCF = columnFamilies.size(); 1869 List<String> fileList = region.getStoreFileList( 1870 columnFamilies.toArray(new byte[nCF][])); 1871 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder(); 1872 builder.addAllStoreFile(fileList); 1873 return builder.build(); 1874 } catch (IOException ie) { 1875 throw new ServiceException(ie); 1876 } 1877 } 1878 1879 /** 1880 * Open asynchronously a region or a set of regions on the region server. 1881 * 1882 * The opening is coordinated by ZooKeeper, and this method requires the znode to be created 1883 * before being called. As a consequence, this method should be called only from the master. 1884 * <p> 1885 * Different manages states for the region are: 1886 * </p><ul> 1887 * <li>region not opened: the region opening will start asynchronously.</li> 1888 * <li>a close is already in progress: this is considered as an error.</li> 1889 * <li>an open is already in progress: this new open request will be ignored. This is important 1890 * because the Master can do multiple requests if it crashes.</li> 1891 * <li>the region is already opened: this new open request will be ignored.</li> 1892 * </ul> 1893 * <p> 1894 * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign. 1895 * For a single region opening, errors are sent through a ServiceException. For bulk assign, 1896 * errors are put in the response as FAILED_OPENING. 1897 * </p> 1898 * @param controller the RPC controller 1899 * @param request the request 1900 * @throws ServiceException 1901 */ 1902 @Override 1903 @QosPriority(priority=HConstants.ADMIN_QOS) 1904 public OpenRegionResponse openRegion(final RpcController controller, 1905 final OpenRegionRequest request) throws ServiceException { 1906 requestCount.increment(); 1907 if (request.hasServerStartCode()) { 1908 // check that we are the same server that this RPC is intended for. 1909 long serverStartCode = request.getServerStartCode(); 1910 if (regionServer.serverName.getStartcode() != serverStartCode) { 1911 throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " + 1912 "different server with startCode: " + serverStartCode + ", this server is: " 1913 + regionServer.serverName)); 1914 } 1915 } 1916 1917 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder(); 1918 final int regionCount = request.getOpenInfoCount(); 1919 final Map<TableName, TableDescriptor> htds = new HashMap<>(regionCount); 1920 final boolean isBulkAssign = regionCount > 1; 1921 try { 1922 checkOpen(); 1923 } catch (IOException ie) { 1924 TableName tableName = null; 1925 if (regionCount == 1) { 1926 org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri = request.getOpenInfo(0).getRegion(); 1927 if (ri != null) { 1928 tableName = ProtobufUtil.toTableName(ri.getTableName()); 1929 } 1930 } 1931 if (!TableName.META_TABLE_NAME.equals(tableName)) { 1932 throw new ServiceException(ie); 1933 } 1934 // We are assigning meta, wait a little for regionserver to finish initialization. 1935 int timeout = regionServer.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1936 HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; // Quarter of RPC timeout 1937 long endTime = System.currentTimeMillis() + timeout; 1938 synchronized (regionServer.online) { 1939 try { 1940 while (System.currentTimeMillis() <= endTime 1941 && !regionServer.isStopped() && !regionServer.isOnline()) { 1942 regionServer.online.wait(regionServer.msgInterval); 1943 } 1944 checkOpen(); 1945 } catch (InterruptedException t) { 1946 Thread.currentThread().interrupt(); 1947 throw new ServiceException(t); 1948 } catch (IOException e) { 1949 throw new ServiceException(e); 1950 } 1951 } 1952 } 1953 1954 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1; 1955 1956 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { 1957 final RegionInfo region = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion()); 1958 TableDescriptor htd; 1959 try { 1960 String encodedName = region.getEncodedName(); 1961 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 1962 final HRegion onlineRegion = regionServer.getRegion(encodedName); 1963 if (onlineRegion != null) { 1964 // The region is already online. This should not happen any more. 1965 String error = "Received OPEN for the region:" 1966 + region.getRegionNameAsString() + ", which is already online"; 1967 LOG.warn(error); 1968 //regionServer.abort(error); 1969 //throw new IOException(error); 1970 builder.addOpeningState(RegionOpeningState.OPENED); 1971 continue; 1972 } 1973 LOG.info("Open " + region.getRegionNameAsString()); 1974 1975 final Boolean previous = regionServer.regionsInTransitionInRS.putIfAbsent( 1976 encodedNameBytes, Boolean.TRUE); 1977 1978 if (Boolean.FALSE.equals(previous)) { 1979 if (regionServer.getRegion(encodedName) != null) { 1980 // There is a close in progress. This should not happen any more. 1981 String error = "Received OPEN for the region:" 1982 + region.getRegionNameAsString() + ", which we are already trying to CLOSE"; 1983 regionServer.abort(error); 1984 throw new IOException(error); 1985 } 1986 regionServer.regionsInTransitionInRS.put(encodedNameBytes, Boolean.TRUE); 1987 } 1988 1989 if (Boolean.TRUE.equals(previous)) { 1990 // An open is in progress. This is supported, but let's log this. 1991 LOG.info("Receiving OPEN for the region:" + 1992 region.getRegionNameAsString() + ", which we are already trying to OPEN" 1993 + " - ignoring this new request for this region."); 1994 } 1995 1996 // We are opening this region. If it moves back and forth for whatever reason, we don't 1997 // want to keep returning the stale moved record while we are opening/if we close again. 1998 regionServer.removeFromMovedRegions(region.getEncodedName()); 1999 2000 if (previous == null || !previous.booleanValue()) { 2001 htd = htds.get(region.getTable()); 2002 if (htd == null) { 2003 htd = regionServer.tableDescriptors.get(region.getTable()); 2004 htds.put(region.getTable(), htd); 2005 } 2006 if (htd == null) { 2007 throw new IOException("Missing table descriptor for " + region.getEncodedName()); 2008 } 2009 // If there is no action in progress, we can submit a specific handler. 2010 // Need to pass the expected version in the constructor. 2011 if (regionServer.executorService == null) { 2012 LOG.info("No executor executorService; skipping open request"); 2013 } else { 2014 if (region.isMetaRegion()) { 2015 regionServer.executorService.submit(new OpenMetaHandler( 2016 regionServer, regionServer, region, htd, masterSystemTime)); 2017 } else { 2018 if (regionOpenInfo.getFavoredNodesCount() > 0) { 2019 regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(), 2020 regionOpenInfo.getFavoredNodesList()); 2021 } 2022 if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) { 2023 regionServer.executorService.submit(new OpenPriorityRegionHandler( 2024 regionServer, regionServer, region, htd, masterSystemTime)); 2025 } else { 2026 regionServer.executorService.submit(new OpenRegionHandler( 2027 regionServer, regionServer, region, htd, masterSystemTime)); 2028 } 2029 } 2030 } 2031 } 2032 2033 builder.addOpeningState(RegionOpeningState.OPENED); 2034 } catch (IOException ie) { 2035 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie); 2036 if (isBulkAssign) { 2037 builder.addOpeningState(RegionOpeningState.FAILED_OPENING); 2038 } else { 2039 throw new ServiceException(ie); 2040 } 2041 } 2042 } 2043 return builder.build(); 2044 } 2045 2046 /** 2047 * Wamrmup a region on this server. 2048 * 2049 * This method should only be called by Master. It synchrnously opens the region and 2050 * closes the region bringing the most important pages in cache. 2051 * <p> 2052 * 2053 * @param controller the RPC controller 2054 * @param request the request 2055 * @throws ServiceException 2056 */ 2057 @Override 2058 public WarmupRegionResponse warmupRegion(final RpcController controller, 2059 final WarmupRegionRequest request) throws ServiceException { 2060 2061 final RegionInfo region = ProtobufUtil.toRegionInfo(request.getRegionInfo()); 2062 TableDescriptor htd; 2063 WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance(); 2064 2065 try { 2066 checkOpen(); 2067 String encodedName = region.getEncodedName(); 2068 byte[] encodedNameBytes = region.getEncodedNameAsBytes(); 2069 final HRegion onlineRegion = regionServer.getRegion(encodedName); 2070 2071 if (onlineRegion != null) { 2072 LOG.info("Region already online. Skipping warming up " + region); 2073 return response; 2074 } 2075 2076 if (LOG.isDebugEnabled()) { 2077 LOG.debug("Warming up Region " + region.getRegionNameAsString()); 2078 } 2079 2080 htd = regionServer.tableDescriptors.get(region.getTable()); 2081 2082 if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) { 2083 LOG.info("Region is in transition. Skipping warmup " + region); 2084 return response; 2085 } 2086 2087 HRegion.warmupHRegion(region, htd, regionServer.getWAL(region), 2088 regionServer.getConfiguration(), regionServer, null); 2089 2090 } catch (IOException ie) { 2091 LOG.error("Failed warming up region " + region.getRegionNameAsString(), ie); 2092 throw new ServiceException(ie); 2093 } 2094 2095 return response; 2096 } 2097 2098 /** 2099 * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is 2100 * that the given mutations will be durable on the receiving RS if this method returns without any 2101 * exception. 2102 * @param controller the RPC controller 2103 * @param request the request 2104 * @throws ServiceException 2105 */ 2106 @Override 2107 @QosPriority(priority = HConstants.REPLAY_QOS) 2108 public ReplicateWALEntryResponse replay(final RpcController controller, 2109 final ReplicateWALEntryRequest request) throws ServiceException { 2110 long before = EnvironmentEdgeManager.currentTime(); 2111 CellScanner cells = ((HBaseRpcController) controller).cellScanner(); 2112 try { 2113 checkOpen(); 2114 List<WALEntry> entries = request.getEntryList(); 2115 if (entries == null || entries.isEmpty()) { 2116 // empty input 2117 return ReplicateWALEntryResponse.newBuilder().build(); 2118 } 2119 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 2120 HRegion region = regionServer.getRegionByEncodedName(regionName.toStringUtf8()); 2121 RegionCoprocessorHost coprocessorHost = 2122 ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) 2123 ? region.getCoprocessorHost() 2124 : null; // do not invoke coprocessors if this is a secondary region replica 2125 List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>(); 2126 2127 // Skip adding the edits to WAL if this is a secondary region replica 2128 boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); 2129 Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL; 2130 2131 for (WALEntry entry : entries) { 2132 if (!regionName.equals(entry.getKey().getEncodedRegionName())) { 2133 throw new NotServingRegionException("Replay request contains entries from multiple " + 2134 "regions. First region:" + regionName.toStringUtf8() + " , other region:" 2135 + entry.getKey().getEncodedRegionName()); 2136 } 2137 if (regionServer.nonceManager != null && isPrimary) { 2138 long nonceGroup = entry.getKey().hasNonceGroup() 2139 ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 2140 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 2141 regionServer.nonceManager.reportOperationFromWal( 2142 nonceGroup, 2143 nonce, 2144 entry.getKey().getWriteTime()); 2145 } 2146 Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>(); 2147 List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry, 2148 cells, walEntry, durability); 2149 if (coprocessorHost != null) { 2150 // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a 2151 // KeyValue. 2152 if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), 2153 walEntry.getSecond())) { 2154 // if bypass this log entry, ignore it ... 2155 continue; 2156 } 2157 walEntries.add(walEntry); 2158 } 2159 if(edits!=null && !edits.isEmpty()) { 2160 // HBASE-17924 2161 // sort to improve lock efficiency 2162 Collections.sort(edits, (v1, v2) -> Row.COMPARATOR.compare(v1.mutation, v2.mutation)); 2163 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? 2164 entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber(); 2165 OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); 2166 // check if it's a partial success 2167 for (int i = 0; result != null && i < result.length; i++) { 2168 if (result[i] != OperationStatus.SUCCESS) { 2169 throw new IOException(result[i].getExceptionMsg()); 2170 } 2171 } 2172 } 2173 } 2174 2175 //sync wal at the end because ASYNC_WAL is used above 2176 WAL wal = region.getWAL(); 2177 if (wal != null) { 2178 wal.sync(); 2179 } 2180 2181 if (coprocessorHost != null) { 2182 for (Pair<WALKey, WALEdit> entry : walEntries) { 2183 coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(), 2184 entry.getSecond()); 2185 } 2186 } 2187 return ReplicateWALEntryResponse.newBuilder().build(); 2188 } catch (IOException ie) { 2189 throw new ServiceException(ie); 2190 } finally { 2191 if (regionServer.metricsRegionServer != null) { 2192 regionServer.metricsRegionServer.updateReplay( 2193 EnvironmentEdgeManager.currentTime() - before); 2194 } 2195 } 2196 } 2197 2198 /** 2199 * Replicate WAL entries on the region server. 2200 * 2201 * @param controller the RPC controller 2202 * @param request the request 2203 * @throws ServiceException 2204 */ 2205 @Override 2206 @QosPriority(priority=HConstants.REPLICATION_QOS) 2207 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, 2208 final ReplicateWALEntryRequest request) throws ServiceException { 2209 try { 2210 checkOpen(); 2211 if (regionServer.replicationSinkHandler != null) { 2212 requestCount.increment(); 2213 List<WALEntry> entries = request.getEntryList(); 2214 CellScanner cellScanner = ((HBaseRpcController)controller).cellScanner(); 2215 regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(); 2216 regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner, 2217 request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), 2218 request.getSourceHFileArchiveDirPath()); 2219 regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(); 2220 return ReplicateWALEntryResponse.newBuilder().build(); 2221 } else { 2222 throw new ServiceException("Replication services are not initialized yet"); 2223 } 2224 } catch (IOException ie) { 2225 throw new ServiceException(ie); 2226 } 2227 } 2228 2229 /** 2230 * Roll the WAL writer of the region server. 2231 * @param controller the RPC controller 2232 * @param request the request 2233 * @throws ServiceException 2234 */ 2235 @Override 2236 public RollWALWriterResponse rollWALWriter(final RpcController controller, 2237 final RollWALWriterRequest request) throws ServiceException { 2238 try { 2239 checkOpen(); 2240 requestCount.increment(); 2241 regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest(); 2242 regionServer.walRoller.requestRollAll(); 2243 regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest(); 2244 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder(); 2245 return builder.build(); 2246 } catch (IOException ie) { 2247 throw new ServiceException(ie); 2248 } 2249 } 2250 2251 2252 /** 2253 * Stop the region server. 2254 * 2255 * @param controller the RPC controller 2256 * @param request the request 2257 * @throws ServiceException 2258 */ 2259 @Override 2260 @QosPriority(priority=HConstants.ADMIN_QOS) 2261 public StopServerResponse stopServer(final RpcController controller, 2262 final StopServerRequest request) throws ServiceException { 2263 requestCount.increment(); 2264 String reason = request.getReason(); 2265 regionServer.stop(reason); 2266 return StopServerResponse.newBuilder().build(); 2267 } 2268 2269 @Override 2270 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, 2271 UpdateFavoredNodesRequest request) throws ServiceException { 2272 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList(); 2273 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder(); 2274 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) { 2275 RegionInfo hri = ProtobufUtil.toRegionInfo(regionUpdateInfo.getRegion()); 2276 if (regionUpdateInfo.getFavoredNodesCount() > 0) { 2277 regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(), 2278 regionUpdateInfo.getFavoredNodesList()); 2279 } 2280 } 2281 respBuilder.setResponse(openInfoList.size()); 2282 return respBuilder.build(); 2283 } 2284 2285 /** 2286 * Atomically bulk load several HFiles into an open region 2287 * @return true if successful, false is failed but recoverably (no action) 2288 * @throws ServiceException if failed unrecoverably 2289 */ 2290 @Override 2291 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, 2292 final BulkLoadHFileRequest request) throws ServiceException { 2293 long start = EnvironmentEdgeManager.currentTime(); 2294 try { 2295 checkOpen(); 2296 requestCount.increment(); 2297 HRegion region = getRegion(request.getRegion()); 2298 Map<byte[], List<Path>> map = null; 2299 2300 // Check to see if this bulk load would exceed the space quota for this table 2301 if (QuotaUtil.isQuotaEnabled(getConfiguration())) { 2302 ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements(); 2303 SpaceViolationPolicyEnforcement enforcement = activeSpaceQuotas.getPolicyEnforcement( 2304 region); 2305 if (enforcement != null) { 2306 // Bulk loads must still be atomic. We must enact all or none. 2307 List<String> filePaths = new ArrayList<>(request.getFamilyPathCount()); 2308 for (FamilyPath familyPath : request.getFamilyPathList()) { 2309 filePaths.add(familyPath.getPath()); 2310 } 2311 // Check if the batch of files exceeds the current quota 2312 enforcement.checkBulkLoad(regionServer.getFileSystem(), filePaths); 2313 } 2314 } 2315 2316 List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount()); 2317 for (FamilyPath familyPath : request.getFamilyPathList()) { 2318 familyPaths.add(new Pair<>(familyPath.getFamily().toByteArray(), familyPath.getPath())); 2319 } 2320 if (!request.hasBulkToken()) { 2321 if (region.getCoprocessorHost() != null) { 2322 region.getCoprocessorHost().preBulkLoadHFile(familyPaths); 2323 } 2324 try { 2325 map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, 2326 request.getCopyFile()); 2327 } finally { 2328 if (region.getCoprocessorHost() != null) { 2329 region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map); 2330 } 2331 } 2332 } else { 2333 // secure bulk load 2334 map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request); 2335 } 2336 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); 2337 builder.setLoaded(map != null); 2338 return builder.build(); 2339 } catch (IOException ie) { 2340 throw new ServiceException(ie); 2341 } finally { 2342 if (regionServer.metricsRegionServer != null) { 2343 regionServer.metricsRegionServer.updateBulkLoad( 2344 EnvironmentEdgeManager.currentTime() - start); 2345 } 2346 } 2347 } 2348 2349 @Override 2350 public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, 2351 PrepareBulkLoadRequest request) throws ServiceException { 2352 try { 2353 checkOpen(); 2354 requestCount.increment(); 2355 2356 HRegion region = getRegion(request.getRegion()); 2357 2358 String bulkToken = regionServer.secureBulkLoadManager.prepareBulkLoad(region, request); 2359 PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder(); 2360 builder.setBulkToken(bulkToken); 2361 return builder.build(); 2362 } catch (IOException ie) { 2363 throw new ServiceException(ie); 2364 } 2365 } 2366 2367 @Override 2368 public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, 2369 CleanupBulkLoadRequest request) throws ServiceException { 2370 try { 2371 checkOpen(); 2372 requestCount.increment(); 2373 2374 HRegion region = getRegion(request.getRegion()); 2375 2376 regionServer.secureBulkLoadManager.cleanupBulkLoad(region, request); 2377 CleanupBulkLoadResponse response = CleanupBulkLoadResponse.newBuilder().build(); 2378 return response; 2379 } catch (IOException ie) { 2380 throw new ServiceException(ie); 2381 } 2382 } 2383 2384 @Override 2385 public CoprocessorServiceResponse execService(final RpcController controller, 2386 final CoprocessorServiceRequest request) throws ServiceException { 2387 try { 2388 checkOpen(); 2389 requestCount.increment(); 2390 HRegion region = getRegion(request.getRegion()); 2391 com.google.protobuf.Message result = execServiceOnRegion(region, request.getCall()); 2392 CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder(); 2393 builder.setRegion(RequestConverter.buildRegionSpecifier( 2394 RegionSpecifierType.REGION_NAME, region.getRegionInfo().getRegionName())); 2395 // TODO: COPIES!!!!!! 2396 builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()). 2397 setValue(org.apache.hbase.thirdparty.com.google.protobuf.ByteString. 2398 copyFrom(result.toByteArray()))); 2399 return builder.build(); 2400 } catch (IOException ie) { 2401 throw new ServiceException(ie); 2402 } 2403 } 2404 2405 private com.google.protobuf.Message execServiceOnRegion(HRegion region, 2406 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException { 2407 // ignore the passed in controller (from the serialized call) 2408 ServerRpcController execController = new ServerRpcController(); 2409 return region.execService(execController, serviceCall); 2410 } 2411 2412 /** 2413 * Get data from a table. 2414 * 2415 * @param controller the RPC controller 2416 * @param request the get request 2417 * @throws ServiceException 2418 */ 2419 @Override 2420 public GetResponse get(final RpcController controller, 2421 final GetRequest request) throws ServiceException { 2422 long before = EnvironmentEdgeManager.currentTime(); 2423 OperationQuota quota = null; 2424 HRegion region = null; 2425 try { 2426 checkOpen(); 2427 requestCount.increment(); 2428 rpcGetRequestCount.increment(); 2429 region = getRegion(request.getRegion()); 2430 2431 GetResponse.Builder builder = GetResponse.newBuilder(); 2432 ClientProtos.Get get = request.getGet(); 2433 // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do 2434 // a get closest before. Throwing the UnknownProtocolException signals it that it needs 2435 // to switch and do hbase2 protocol (HBase servers do not tell clients what versions 2436 // they are; its a problem for non-native clients like asynchbase. HBASE-20225. 2437 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) { 2438 throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? " + 2439 "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by " + 2440 "reverse Scan."); 2441 } 2442 Boolean existence = null; 2443 Result r = null; 2444 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2445 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET); 2446 2447 Get clientGet = ProtobufUtil.toGet(get); 2448 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) { 2449 existence = region.getCoprocessorHost().preExists(clientGet); 2450 } 2451 if (existence == null) { 2452 if (context != null) { 2453 r = get(clientGet, (region), null, context); 2454 } else { 2455 // for test purpose 2456 r = region.get(clientGet); 2457 } 2458 if (get.getExistenceOnly()) { 2459 boolean exists = r.getExists(); 2460 if (region.getCoprocessorHost() != null) { 2461 exists = region.getCoprocessorHost().postExists(clientGet, exists); 2462 } 2463 existence = exists; 2464 } 2465 } 2466 if (existence != null) { 2467 ClientProtos.Result pbr = 2468 ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0); 2469 builder.setResult(pbr); 2470 } else if (r != null) { 2471 ClientProtos.Result pbr; 2472 if (isClientCellBlockSupport(context) && controller instanceof HBaseRpcController 2473 && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 3)) { 2474 pbr = ProtobufUtil.toResultNoData(r); 2475 ((HBaseRpcController) controller).setCellScanner(CellUtil.createCellScanner(r 2476 .rawCells())); 2477 addSize(context, r, null); 2478 } else { 2479 pbr = ProtobufUtil.toResult(r); 2480 } 2481 builder.setResult(pbr); 2482 } 2483 //r.cells is null when an table.exists(get) call 2484 if (r != null && r.rawCells() != null) { 2485 quota.addGetResult(r); 2486 } 2487 return builder.build(); 2488 } catch (IOException ie) { 2489 throw new ServiceException(ie); 2490 } finally { 2491 MetricsRegionServer mrs = regionServer.metricsRegionServer; 2492 if (mrs != null) { 2493 TableDescriptor td = region != null? region.getTableDescriptor(): null; 2494 if (td != null) { 2495 mrs.updateGet(td.getTableName(), EnvironmentEdgeManager.currentTime() - before); 2496 } 2497 } 2498 if (quota != null) { 2499 quota.close(); 2500 } 2501 } 2502 } 2503 2504 private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, 2505 RpcCallContext context) throws IOException { 2506 region.prepareGet(get); 2507 boolean stale = region.getRegionInfo().getReplicaId() != 0; 2508 2509 // This method is almost the same as HRegion#get. 2510 List<Cell> results = new ArrayList<>(); 2511 long before = EnvironmentEdgeManager.currentTime(); 2512 // pre-get CP hook 2513 if (region.getCoprocessorHost() != null) { 2514 if (region.getCoprocessorHost().preGet(get, results)) { 2515 region.metricsUpdateForGet(results, before); 2516 return Result 2517 .create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); 2518 } 2519 } 2520 Scan scan = new Scan(get); 2521 if (scan.getLoadColumnFamiliesOnDemandValue() == null) { 2522 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 2523 } 2524 RegionScannerImpl scanner = null; 2525 try { 2526 scanner = region.getScanner(scan); 2527 scanner.next(results); 2528 } finally { 2529 if (scanner != null) { 2530 if (closeCallBack == null) { 2531 // If there is a context then the scanner can be added to the current 2532 // RpcCallContext. The rpc callback will take care of closing the 2533 // scanner, for eg in case 2534 // of get() 2535 context.setCallBack(scanner); 2536 } else { 2537 // The call is from multi() where the results from the get() are 2538 // aggregated and then send out to the 2539 // rpc. The rpccall back will close all such scanners created as part 2540 // of multi(). 2541 closeCallBack.addScanner(scanner); 2542 } 2543 } 2544 } 2545 2546 // post-get CP hook 2547 if (region.getCoprocessorHost() != null) { 2548 region.getCoprocessorHost().postGet(get, results); 2549 } 2550 region.metricsUpdateForGet(results, before); 2551 2552 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); 2553 } 2554 2555 private void checkBatchSizeAndLogLargeSize(MultiRequest request) { 2556 int sum = 0; 2557 String firstRegionName = null; 2558 for (RegionAction regionAction : request.getRegionActionList()) { 2559 if (sum == 0) { 2560 firstRegionName = Bytes.toStringBinary(regionAction.getRegion().getValue().toByteArray()); 2561 } 2562 sum += regionAction.getActionCount(); 2563 } 2564 if (sum > rowSizeWarnThreshold) { 2565 ld.logBatchWarning(firstRegionName, sum, rowSizeWarnThreshold); 2566 } 2567 } 2568 2569 /** 2570 * Execute multiple actions on a table: get, mutate, and/or execCoprocessor 2571 * 2572 * @param rpcc the RPC controller 2573 * @param request the multi request 2574 * @throws ServiceException 2575 */ 2576 @Override 2577 public MultiResponse multi(final RpcController rpcc, final MultiRequest request) 2578 throws ServiceException { 2579 try { 2580 checkOpen(); 2581 } catch (IOException ie) { 2582 throw new ServiceException(ie); 2583 } 2584 2585 checkBatchSizeAndLogLargeSize(request); 2586 2587 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2588 // It is also the conduit via which we pass back data. 2589 HBaseRpcController controller = (HBaseRpcController)rpcc; 2590 CellScanner cellScanner = controller != null ? controller.cellScanner(): null; 2591 if (controller != null) { 2592 controller.setCellScanner(null); 2593 } 2594 2595 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2596 2597 // this will contain all the cells that we need to return. It's created later, if needed. 2598 List<CellScannable> cellsToReturn = null; 2599 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder(); 2600 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); 2601 Boolean processed = null; 2602 RegionScannersCloseCallBack closeCallBack = null; 2603 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2604 this.rpcMultiRequestCount.increment(); 2605 this.requestCount.increment(); 2606 Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = new HashMap<>(request 2607 .getRegionActionCount()); 2608 ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); 2609 for (RegionAction regionAction : request.getRegionActionList()) { 2610 OperationQuota quota; 2611 HRegion region; 2612 regionActionResultBuilder.clear(); 2613 RegionSpecifier regionSpecifier = regionAction.getRegion(); 2614 try { 2615 region = getRegion(regionSpecifier); 2616 quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList()); 2617 } catch (IOException e) { 2618 rpcServer.getMetrics().exception(e); 2619 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2620 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2621 // All Mutations in this RegionAction not executed as we can not see the Region online here 2622 // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner 2623 // corresponding to these Mutations. 2624 skipCellsForMutations(regionAction.getActionList(), cellScanner); 2625 continue; // For this region it's a failure. 2626 } 2627 2628 if (regionAction.hasAtomic() && regionAction.getAtomic()) { 2629 // How does this call happen? It may need some work to play well w/ the surroundings. 2630 // Need to return an item per Action along w/ Action index. TODO. 2631 try { 2632 if (request.hasCondition()) { 2633 Condition condition = request.getCondition(); 2634 byte[] row = condition.getRow().toByteArray(); 2635 byte[] family = condition.getFamily().toByteArray(); 2636 byte[] qualifier = condition.getQualifier().toByteArray(); 2637 CompareOperator op = 2638 CompareOperator.valueOf(condition.getCompareType().name()); 2639 ByteArrayComparable comparator = 2640 ProtobufUtil.toComparator(condition.getComparator()); 2641 TimeRange timeRange = condition.hasTimeRange() ? 2642 ProtobufUtil.toTimeRange(condition.getTimeRange()) : 2643 TimeRange.allTime(); 2644 processed = 2645 checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family, 2646 qualifier, op, comparator, timeRange, regionActionResultBuilder, 2647 spaceQuotaEnforcement); 2648 } else { 2649 doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), 2650 cellScanner, spaceQuotaEnforcement); 2651 processed = Boolean.TRUE; 2652 } 2653 } catch (IOException e) { 2654 rpcServer.getMetrics().exception(e); 2655 // As it's atomic, we may expect it's a global failure. 2656 regionActionResultBuilder.setException(ResponseConverter.buildException(e)); 2657 } 2658 } else { 2659 // doNonAtomicRegionMutation manages the exception internally 2660 if (context != null && closeCallBack == null) { 2661 // An RpcCallBack that creates a list of scanners that needs to perform callBack 2662 // operation on completion of multiGets. 2663 // Set this only once 2664 closeCallBack = new RegionScannersCloseCallBack(); 2665 context.setCallBack(closeCallBack); 2666 } 2667 cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner, 2668 regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context, 2669 spaceQuotaEnforcement); 2670 } 2671 responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); 2672 quota.close(); 2673 ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); 2674 if(regionLoadStats != null) { 2675 regionStats.put(regionSpecifier, regionLoadStats); 2676 } 2677 } 2678 // Load the controller with the Cells to return. 2679 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { 2680 controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn)); 2681 } 2682 2683 if (processed != null) { 2684 responseBuilder.setProcessed(processed); 2685 } 2686 2687 MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder(); 2688 for(Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat: regionStats.entrySet()){ 2689 builder.addRegion(stat.getKey()); 2690 builder.addStat(stat.getValue()); 2691 } 2692 responseBuilder.setRegionStatistics(builder); 2693 return responseBuilder.build(); 2694 } 2695 2696 private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) { 2697 if (cellScanner == null) { 2698 return; 2699 } 2700 for (Action action : actions) { 2701 skipCellsForMutation(action, cellScanner); 2702 } 2703 } 2704 2705 private void skipCellsForMutation(Action action, CellScanner cellScanner) { 2706 if (cellScanner == null) { 2707 return; 2708 } 2709 try { 2710 if (action.hasMutation()) { 2711 MutationProto m = action.getMutation(); 2712 if (m.hasAssociatedCellCount()) { 2713 for (int i = 0; i < m.getAssociatedCellCount(); i++) { 2714 cellScanner.advance(); 2715 } 2716 } 2717 } 2718 } catch (IOException e) { 2719 // No need to handle these Individual Muatation level issue. Any way this entire RegionAction 2720 // marked as failed as we could not see the Region here. At client side the top level 2721 // RegionAction exception will be considered first. 2722 LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e); 2723 } 2724 } 2725 2726 /** 2727 * Mutate data in a table. 2728 * 2729 * @param rpcc the RPC controller 2730 * @param request the mutate request 2731 */ 2732 @Override 2733 public MutateResponse mutate(final RpcController rpcc, 2734 final MutateRequest request) throws ServiceException { 2735 // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. 2736 // It is also the conduit via which we pass back data. 2737 HBaseRpcController controller = (HBaseRpcController)rpcc; 2738 CellScanner cellScanner = controller != null ? controller.cellScanner() : null; 2739 OperationQuota quota = null; 2740 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 2741 ActivePolicyEnforcement spaceQuotaEnforcement = null; 2742 MutationType type = null; 2743 HRegion region = null; 2744 long before = EnvironmentEdgeManager.currentTime(); 2745 // Clear scanner so we are not holding on to reference across call. 2746 if (controller != null) { 2747 controller.setCellScanner(null); 2748 } 2749 try { 2750 checkOpen(); 2751 requestCount.increment(); 2752 rpcMutateRequestCount.increment(); 2753 region = getRegion(request.getRegion()); 2754 MutateResponse.Builder builder = MutateResponse.newBuilder(); 2755 MutationProto mutation = request.getMutation(); 2756 if (!region.getRegionInfo().isMetaRegion()) { 2757 regionServer.cacheFlusher.reclaimMemStoreMemory(); 2758 } 2759 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 2760 Result r = null; 2761 Boolean processed = null; 2762 type = mutation.getMutateType(); 2763 2764 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE); 2765 spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); 2766 2767 switch (type) { 2768 case APPEND: 2769 // TODO: this doesn't actually check anything. 2770 r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); 2771 break; 2772 case INCREMENT: 2773 // TODO: this doesn't actually check anything. 2774 r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); 2775 break; 2776 case PUT: 2777 Put put = ProtobufUtil.toPut(mutation, cellScanner); 2778 checkCellSizeLimit(region, put); 2779 // Throws an exception when violated 2780 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); 2781 quota.addMutation(put); 2782 if (request.hasCondition()) { 2783 Condition condition = request.getCondition(); 2784 byte[] row = condition.getRow().toByteArray(); 2785 byte[] family = condition.getFamily().toByteArray(); 2786 byte[] qualifier = condition.getQualifier().toByteArray(); 2787 CompareOperator compareOp = 2788 CompareOperator.valueOf(condition.getCompareType().name()); 2789 ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator()); 2790 TimeRange timeRange = condition.hasTimeRange() ? 2791 ProtobufUtil.toTimeRange(condition.getTimeRange()) : 2792 TimeRange.allTime(); 2793 if (region.getCoprocessorHost() != null) { 2794 processed = region.getCoprocessorHost().preCheckAndPut(row, family, qualifier, 2795 compareOp, comparator, put); 2796 } 2797 if (processed == null) { 2798 boolean result = region.checkAndMutate(row, family, 2799 qualifier, compareOp, comparator, timeRange, put); 2800 if (region.getCoprocessorHost() != null) { 2801 result = region.getCoprocessorHost().postCheckAndPut(row, family, 2802 qualifier, compareOp, comparator, put, result); 2803 } 2804 processed = result; 2805 } 2806 } else { 2807 region.put(put); 2808 processed = Boolean.TRUE; 2809 } 2810 break; 2811 case DELETE: 2812 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); 2813 checkCellSizeLimit(region, delete); 2814 spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete); 2815 quota.addMutation(delete); 2816 if (request.hasCondition()) { 2817 Condition condition = request.getCondition(); 2818 byte[] row = condition.getRow().toByteArray(); 2819 byte[] family = condition.getFamily().toByteArray(); 2820 byte[] qualifier = condition.getQualifier().toByteArray(); 2821 CompareOperator op = CompareOperator.valueOf(condition.getCompareType().name()); 2822 ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator()); 2823 TimeRange timeRange = condition.hasTimeRange() ? 2824 ProtobufUtil.toTimeRange(condition.getTimeRange()) : 2825 TimeRange.allTime(); 2826 if (region.getCoprocessorHost() != null) { 2827 processed = region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier, op, 2828 comparator, delete); 2829 } 2830 if (processed == null) { 2831 boolean result = region.checkAndMutate(row, family, 2832 qualifier, op, comparator, timeRange, delete); 2833 if (region.getCoprocessorHost() != null) { 2834 result = region.getCoprocessorHost().postCheckAndDelete(row, family, 2835 qualifier, op, comparator, delete, result); 2836 } 2837 processed = result; 2838 } 2839 } else { 2840 region.delete(delete); 2841 processed = Boolean.TRUE; 2842 } 2843 break; 2844 default: 2845 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); 2846 } 2847 if (processed != null) { 2848 builder.setProcessed(processed.booleanValue()); 2849 } 2850 boolean clientCellBlockSupported = isClientCellBlockSupport(context); 2851 addResult(builder, r, controller, clientCellBlockSupported); 2852 if (clientCellBlockSupported) { 2853 addSize(context, r, null); 2854 } 2855 return builder.build(); 2856 } catch (IOException ie) { 2857 regionServer.checkFileSystem(); 2858 throw new ServiceException(ie); 2859 } finally { 2860 if (quota != null) { 2861 quota.close(); 2862 } 2863 // Update metrics 2864 if (regionServer.metricsRegionServer != null && type != null) { 2865 long after = EnvironmentEdgeManager.currentTime(); 2866 switch (type) { 2867 case DELETE: 2868 if (request.hasCondition()) { 2869 regionServer.metricsRegionServer.updateCheckAndDelete(after - before); 2870 } else { 2871 regionServer.metricsRegionServer.updateDelete( 2872 region == null ? null : region.getRegionInfo().getTable(), after - before); 2873 } 2874 break; 2875 case PUT: 2876 if (request.hasCondition()) { 2877 regionServer.metricsRegionServer.updateCheckAndPut(after - before); 2878 } else { 2879 regionServer.metricsRegionServer.updatePut( 2880 region == null ? null : region.getRegionInfo().getTable(),after - before); 2881 } 2882 break; 2883 default: 2884 break; 2885 2886 } 2887 } 2888 } 2889 } 2890 2891 // This is used to keep compatible with the old client implementation. Consider remove it if we 2892 // decide to drop the support of the client that still sends close request to a region scanner 2893 // which has already been exhausted. 2894 @Deprecated 2895 private static final IOException SCANNER_ALREADY_CLOSED = new IOException() { 2896 2897 private static final long serialVersionUID = -4305297078988180130L; 2898 2899 @Override 2900 public synchronized Throwable fillInStackTrace() { 2901 return this; 2902 } 2903 }; 2904 2905 private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException { 2906 String scannerName = Long.toString(request.getScannerId()); 2907 RegionScannerHolder rsh = scanners.get(scannerName); 2908 if (rsh == null) { 2909 // just ignore the next or close request if scanner does not exists. 2910 if (closedScanners.getIfPresent(scannerName) != null) { 2911 throw SCANNER_ALREADY_CLOSED; 2912 } else { 2913 LOG.warn("Client tried to access missing scanner " + scannerName); 2914 throw new UnknownScannerException( 2915 "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " + 2916 "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " + 2917 "long wait between consecutive client checkins, c) Server may be closing down, " + 2918 "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " + 2919 "possible fix would be increasing the value of" + 2920 "'hbase.client.scanner.timeout.period' configuration."); 2921 } 2922 } 2923 RegionInfo hri = rsh.s.getRegionInfo(); 2924 // Yes, should be the same instance 2925 if (regionServer.getOnlineRegion(hri.getRegionName()) != rsh.r) { 2926 String msg = "Region has changed on the scanner " + scannerName + ": regionName=" 2927 + hri.getRegionNameAsString() + ", scannerRegionName=" + rsh.r; 2928 LOG.warn(msg + ", closing..."); 2929 scanners.remove(scannerName); 2930 try { 2931 rsh.s.close(); 2932 } catch (IOException e) { 2933 LOG.warn("Getting exception closing " + scannerName, e); 2934 } finally { 2935 try { 2936 regionServer.leases.cancelLease(scannerName); 2937 } catch (LeaseException e) { 2938 LOG.warn("Getting exception closing " + scannerName, e); 2939 } 2940 } 2941 throw new NotServingRegionException(msg); 2942 } 2943 return rsh; 2944 } 2945 2946 private RegionScannerHolder newRegionScanner(ScanRequest request, ScanResponse.Builder builder) 2947 throws IOException { 2948 HRegion region = getRegion(request.getRegion()); 2949 ClientProtos.Scan protoScan = request.getScan(); 2950 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); 2951 Scan scan = ProtobufUtil.toScan(protoScan); 2952 // if the request doesn't set this, get the default region setting. 2953 if (!isLoadingCfsOnDemandSet) { 2954 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); 2955 } 2956 2957 if (!scan.hasFamilies()) { 2958 // Adding all families to scanner 2959 for (byte[] family : region.getTableDescriptor().getColumnFamilyNames()) { 2960 scan.addFamily(family); 2961 } 2962 } 2963 if (region.getCoprocessorHost() != null) { 2964 // preScannerOpen is not allowed to return a RegionScanner. Only post hook can create a 2965 // wrapper for the core created RegionScanner 2966 region.getCoprocessorHost().preScannerOpen(scan); 2967 } 2968 RegionScannerImpl coreScanner = region.getScanner(scan); 2969 Shipper shipper = coreScanner; 2970 RegionScanner scanner = coreScanner; 2971 if (region.getCoprocessorHost() != null) { 2972 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); 2973 } 2974 long scannerId = scannerIdGenerator.generateNewScannerId(); 2975 builder.setScannerId(scannerId); 2976 builder.setMvccReadPoint(scanner.getMvccReadPoint()); 2977 builder.setTtl(scannerLeaseTimeoutPeriod); 2978 String scannerName = String.valueOf(scannerId); 2979 return addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult()); 2980 } 2981 2982 private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh) 2983 throws OutOfOrderScannerNextException { 2984 // if nextCallSeq does not match throw Exception straight away. This needs to be 2985 // performed even before checking of Lease. 2986 // See HBASE-5974 2987 if (request.hasNextCallSeq()) { 2988 long callSeq = request.getNextCallSeq(); 2989 if (!rsh.incNextCallSeq(callSeq)) { 2990 throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.getNextCallSeq() 2991 + " But the nextCallSeq got from client: " + request.getNextCallSeq() + "; request=" 2992 + TextFormat.shortDebugString(request)); 2993 } 2994 } 2995 } 2996 2997 private void addScannerLeaseBack(Leases.Lease lease) { 2998 try { 2999 regionServer.leases.addLease(lease); 3000 } catch (LeaseStillHeldException e) { 3001 // should not happen as the scanner id is unique. 3002 throw new AssertionError(e); 3003 } 3004 } 3005 3006 private long getTimeLimit(HBaseRpcController controller, boolean allowHeartbeatMessages) { 3007 // Set the time limit to be half of the more restrictive timeout value (one of the 3008 // timeout values must be positive). In the event that both values are positive, the 3009 // more restrictive of the two is used to calculate the limit. 3010 if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) { 3011 long timeLimitDelta; 3012 if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) { 3013 timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout); 3014 } else { 3015 timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout; 3016 } 3017 if (controller != null && controller.getCallTimeout() > 0) { 3018 timeLimitDelta = Math.min(timeLimitDelta, controller.getCallTimeout()); 3019 } 3020 // Use half of whichever timeout value was more restrictive... But don't allow 3021 // the time limit to be less than the allowable minimum (could cause an 3022 // immediatate timeout before scanning any data). 3023 timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta); 3024 // XXX: Can not use EnvironmentEdge here because TestIncrementTimeRange use a 3025 // ManualEnvironmentEdge. Consider using System.nanoTime instead. 3026 return System.currentTimeMillis() + timeLimitDelta; 3027 } 3028 // Default value of timeLimit is negative to indicate no timeLimit should be 3029 // enforced. 3030 return -1L; 3031 } 3032 3033 private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows, 3034 ScannerContext scannerContext, ScanResponse.Builder builder) { 3035 if (numOfCompleteRows >= limitOfRows) { 3036 if (LOG.isTraceEnabled()) { 3037 LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows + 3038 " scannerContext: " + scannerContext); 3039 } 3040 builder.setMoreResults(false); 3041 } 3042 } 3043 3044 // return whether we have more results in region. 3045 private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, 3046 long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results, 3047 ScanResponse.Builder builder, MutableObject lastBlock, RpcCallContext context) 3048 throws IOException { 3049 HRegion region = rsh.r; 3050 RegionScanner scanner = rsh.s; 3051 long maxResultSize; 3052 if (scanner.getMaxResultSize() > 0) { 3053 maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize); 3054 } else { 3055 maxResultSize = maxQuotaResultSize; 3056 } 3057 // This is cells inside a row. Default size is 10 so if many versions or many cfs, 3058 // then we'll resize. Resizings show in profiler. Set it higher than 10. For now 3059 // arbitrary 32. TODO: keep record of general size of results being returned. 3060 List<Cell> values = new ArrayList<>(32); 3061 region.startRegionOperation(Operation.SCAN); 3062 try { 3063 int numOfResults = 0; 3064 int numOfCompleteRows = 0; 3065 long before = EnvironmentEdgeManager.currentTime(); 3066 synchronized (scanner) { 3067 boolean stale = (region.getRegionInfo().getReplicaId() != 0); 3068 boolean clientHandlesPartials = 3069 request.hasClientHandlesPartials() && request.getClientHandlesPartials(); 3070 boolean clientHandlesHeartbeats = 3071 request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats(); 3072 3073 // On the server side we must ensure that the correct ordering of partial results is 3074 // returned to the client to allow them to properly reconstruct the partial results. 3075 // If the coprocessor host is adding to the result list, we cannot guarantee the 3076 // correct ordering of partial results and so we prevent partial results from being 3077 // formed. 3078 boolean serverGuaranteesOrderOfPartials = results.isEmpty(); 3079 boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials; 3080 boolean moreRows = false; 3081 3082 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a 3083 // certain time threshold on the server. When the time threshold is exceeded, the 3084 // server stops the scan and sends back whatever Results it has accumulated within 3085 // that time period (may be empty). Since heartbeat messages have the potential to 3086 // create partial Results (in the event that the timeout occurs in the middle of a 3087 // row), we must only generate heartbeat messages when the client can handle both 3088 // heartbeats AND partials 3089 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults; 3090 3091 long timeLimit = getTimeLimit(controller, allowHeartbeatMessages); 3092 3093 final LimitScope sizeScope = 3094 allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3095 final LimitScope timeScope = 3096 allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; 3097 3098 boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics(); 3099 3100 // Configure with limits for this RPC. Set keep progress true since size progress 3101 // towards size limit should be kept between calls to nextRaw 3102 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); 3103 // maxResultSize - either we can reach this much size for all cells(being read) data or sum 3104 // of heap size occupied by cells(being read). Cell data means its key and value parts. 3105 contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize); 3106 contextBuilder.setBatchLimit(scanner.getBatch()); 3107 contextBuilder.setTimeLimit(timeScope, timeLimit); 3108 contextBuilder.setTrackMetrics(trackMetrics); 3109 ScannerContext scannerContext = contextBuilder.build(); 3110 boolean limitReached = false; 3111 while (numOfResults < maxResults) { 3112 // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The 3113 // batch limit is a limit on the number of cells per Result. Thus, if progress is 3114 // being tracked (i.e. scannerContext.keepProgress() is true) then we need to 3115 // reset the batch progress between nextRaw invocations since we don't want the 3116 // batch progress from previous calls to affect future calls 3117 scannerContext.setBatchProgress(0); 3118 3119 // Collect values to be returned here 3120 moreRows = scanner.nextRaw(values, scannerContext); 3121 3122 if (!values.isEmpty()) { 3123 if (limitOfRows > 0) { 3124 // First we need to check if the last result is partial and we have a row change. If 3125 // so then we need to increase the numOfCompleteRows. 3126 if (results.isEmpty()) { 3127 if (rsh.rowOfLastPartialResult != null && 3128 !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult)) { 3129 numOfCompleteRows++; 3130 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3131 builder); 3132 } 3133 } else { 3134 Result lastResult = results.get(results.size() - 1); 3135 if (lastResult.mayHaveMoreCellsInRow() && 3136 !CellUtil.matchingRows(values.get(0), lastResult.getRow())) { 3137 numOfCompleteRows++; 3138 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, 3139 builder); 3140 } 3141 } 3142 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3143 break; 3144 } 3145 } 3146 boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow(); 3147 Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow); 3148 lastBlock.setValue(addSize(context, r, lastBlock.getValue())); 3149 results.add(r); 3150 numOfResults++; 3151 if (!mayHaveMoreCellsInRow && limitOfRows > 0) { 3152 numOfCompleteRows++; 3153 checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder); 3154 if (builder.hasMoreResults() && !builder.getMoreResults()) { 3155 break; 3156 } 3157 } 3158 } else if (!moreRows && !results.isEmpty()) { 3159 // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of 3160 // last result is false. Otherwise it's possible that: the first nextRaw returned 3161 // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the 3162 // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will 3163 // return with moreRows=false, which means moreResultsInRegion would be false, it will 3164 // be a contradictory state (HBASE-21206). 3165 int lastIdx = results.size() - 1; 3166 Result r = results.get(lastIdx); 3167 if (r.mayHaveMoreCellsInRow()) { 3168 results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false)); 3169 } 3170 } 3171 boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); 3172 boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); 3173 boolean resultsLimitReached = numOfResults >= maxResults; 3174 limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached; 3175 3176 if (limitReached || !moreRows) { 3177 if (LOG.isTraceEnabled()) { 3178 LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: " + moreRows 3179 + " scannerContext: " + scannerContext); 3180 } 3181 // We only want to mark a ScanResponse as a heartbeat message in the event that 3182 // there are more values to be read server side. If there aren't more values, 3183 // marking it as a heartbeat is wasteful because the client will need to issue 3184 // another ScanRequest only to realize that they already have all the values 3185 if (moreRows && timeLimitReached) { 3186 // Heartbeat messages occur when the time limit has been reached. 3187 builder.setHeartbeatMessage(true); 3188 if (rsh.needCursor) { 3189 Cell cursorCell = scannerContext.getLastPeekedCell(); 3190 if (cursorCell != null) { 3191 builder.setCursor(ProtobufUtil.toCursor(cursorCell)); 3192 } 3193 } 3194 } 3195 break; 3196 } 3197 values.clear(); 3198 } 3199 builder.setMoreResultsInRegion(moreRows); 3200 // Check to see if the client requested that we track metrics server side. If the 3201 // client requested metrics, retrieve the metrics from the scanner context. 3202 if (trackMetrics) { 3203 Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap(); 3204 ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); 3205 NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); 3206 3207 for (Entry<String, Long> entry : metrics.entrySet()) { 3208 pairBuilder.setName(entry.getKey()); 3209 pairBuilder.setValue(entry.getValue()); 3210 metricBuilder.addMetrics(pairBuilder.build()); 3211 } 3212 3213 builder.setScanMetrics(metricBuilder.build()); 3214 } 3215 } 3216 long end = EnvironmentEdgeManager.currentTime(); 3217 long responseCellSize = context != null ? context.getResponseCellSize() : 0; 3218 region.getMetrics().updateScanTime(end - before); 3219 if (regionServer.metricsRegionServer != null) { 3220 regionServer.metricsRegionServer.updateScanSize( 3221 region.getTableDescriptor().getTableName(), responseCellSize); 3222 regionServer.metricsRegionServer.updateScanTime( 3223 region.getTableDescriptor().getTableName(), end - before); 3224 } 3225 } finally { 3226 region.closeRegionOperation(); 3227 } 3228 // coprocessor postNext hook 3229 if (region.getCoprocessorHost() != null) { 3230 region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true); 3231 } 3232 } 3233 3234 /** 3235 * Scan data in a table. 3236 * 3237 * @param controller the RPC controller 3238 * @param request the scan request 3239 * @throws ServiceException 3240 */ 3241 @Override 3242 public ScanResponse scan(final RpcController controller, final ScanRequest request) 3243 throws ServiceException { 3244 if (controller != null && !(controller instanceof HBaseRpcController)) { 3245 throw new UnsupportedOperationException( 3246 "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller); 3247 } 3248 if (!request.hasScannerId() && !request.hasScan()) { 3249 throw new ServiceException( 3250 new DoNotRetryIOException("Missing required input: scannerId or scan")); 3251 } 3252 try { 3253 checkOpen(); 3254 } catch (IOException e) { 3255 if (request.hasScannerId()) { 3256 String scannerName = Long.toString(request.getScannerId()); 3257 if (LOG.isDebugEnabled()) { 3258 LOG.debug( 3259 "Server shutting down and client tried to access missing scanner " + scannerName); 3260 } 3261 if (regionServer.leases != null) { 3262 try { 3263 regionServer.leases.cancelLease(scannerName); 3264 } catch (LeaseException le) { 3265 // No problem, ignore 3266 if (LOG.isTraceEnabled()) { 3267 LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); 3268 } 3269 } 3270 } 3271 } 3272 throw new ServiceException(e); 3273 } 3274 requestCount.increment(); 3275 rpcScanRequestCount.increment(); 3276 RegionScannerHolder rsh; 3277 ScanResponse.Builder builder = ScanResponse.newBuilder(); 3278 try { 3279 if (request.hasScannerId()) { 3280 // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000 3281 // for more details. 3282 builder.setScannerId(request.getScannerId()); 3283 rsh = getRegionScanner(request); 3284 } else { 3285 rsh = newRegionScanner(request, builder); 3286 } 3287 } catch (IOException e) { 3288 if (e == SCANNER_ALREADY_CLOSED) { 3289 // Now we will close scanner automatically if there are no more results for this region but 3290 // the old client will still send a close request to us. Just ignore it and return. 3291 return builder.build(); 3292 } 3293 throw new ServiceException(e); 3294 } 3295 HRegion region = rsh.r; 3296 String scannerName = rsh.scannerName; 3297 Leases.Lease lease; 3298 try { 3299 // Remove lease while its being processed in server; protects against case 3300 // where processing of request takes > lease expiration time. 3301 lease = regionServer.leases.removeLease(scannerName); 3302 } catch (LeaseException e) { 3303 throw new ServiceException(e); 3304 } 3305 if (request.hasRenew() && request.getRenew()) { 3306 // add back and return 3307 addScannerLeaseBack(lease); 3308 try { 3309 checkScanNextCallSeq(request, rsh); 3310 } catch (OutOfOrderScannerNextException e) { 3311 throw new ServiceException(e); 3312 } 3313 return builder.build(); 3314 } 3315 OperationQuota quota; 3316 try { 3317 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN); 3318 } catch (IOException e) { 3319 addScannerLeaseBack(lease); 3320 throw new ServiceException(e); 3321 } 3322 try { 3323 checkScanNextCallSeq(request, rsh); 3324 } catch (OutOfOrderScannerNextException e) { 3325 addScannerLeaseBack(lease); 3326 throw new ServiceException(e); 3327 } 3328 // Now we have increased the next call sequence. If we give client an error, the retry will 3329 // never success. So we'd better close the scanner and return a DoNotRetryIOException to client 3330 // and then client will try to open a new scanner. 3331 boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false; 3332 int rows; // this is scan.getCaching 3333 if (request.hasNumberOfRows()) { 3334 rows = request.getNumberOfRows(); 3335 } else { 3336 rows = closeScanner ? 0 : 1; 3337 } 3338 RpcCallContext context = RpcServer.getCurrentCall().orElse(null); 3339 // now let's do the real scan. 3340 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); 3341 RegionScanner scanner = rsh.s; 3342 // this is the limit of rows for this scan, if we the number of rows reach this value, we will 3343 // close the scanner. 3344 int limitOfRows; 3345 if (request.hasLimitOfRows()) { 3346 limitOfRows = request.getLimitOfRows(); 3347 } else { 3348 limitOfRows = -1; 3349 } 3350 MutableObject<Object> lastBlock = new MutableObject<>(); 3351 boolean scannerClosed = false; 3352 try { 3353 List<Result> results = new ArrayList<>(); 3354 if (rows > 0) { 3355 boolean done = false; 3356 // Call coprocessor. Get region info from scanner. 3357 if (region.getCoprocessorHost() != null) { 3358 Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); 3359 if (!results.isEmpty()) { 3360 for (Result r : results) { 3361 lastBlock.setValue(addSize(context, r, lastBlock.getValue())); 3362 } 3363 } 3364 if (bypass != null && bypass.booleanValue()) { 3365 done = true; 3366 } 3367 } 3368 if (!done) { 3369 scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows, 3370 results, builder, lastBlock, context); 3371 } else { 3372 builder.setMoreResultsInRegion(!results.isEmpty()); 3373 } 3374 } else { 3375 // This is a open scanner call with numberOfRow = 0, so set more results in region to true. 3376 builder.setMoreResultsInRegion(true); 3377 } 3378 3379 quota.addScanResult(results); 3380 addResults(builder, results, (HBaseRpcController) controller, 3381 RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), 3382 isClientCellBlockSupport(context)); 3383 if (scanner.isFilterDone() && results.isEmpty()) { 3384 // If the scanner's filter - if any - is done with the scan 3385 // only set moreResults to false if the results is empty. This is used to keep compatible 3386 // with the old scan implementation where we just ignore the returned results if moreResults 3387 // is false. Can remove the isEmpty check after we get rid of the old implementation. 3388 builder.setMoreResults(false); 3389 } 3390 // Later we may close the scanner depending on this flag so here we need to make sure that we 3391 // have already set this flag. 3392 assert builder.hasMoreResultsInRegion(); 3393 // we only set moreResults to false in the above code, so set it to true if we haven't set it 3394 // yet. 3395 if (!builder.hasMoreResults()) { 3396 builder.setMoreResults(true); 3397 } 3398 if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) { 3399 // Record the last cell of the last result if it is a partial result 3400 // We need this to calculate the complete rows we have returned to client as the 3401 // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the 3402 // current row. We may filter out all the remaining cells for the current row and just 3403 // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to 3404 // check for row change. 3405 Result lastResult = results.get(results.size() - 1); 3406 if (lastResult.mayHaveMoreCellsInRow()) { 3407 rsh.rowOfLastPartialResult = lastResult.getRow(); 3408 } else { 3409 rsh.rowOfLastPartialResult = null; 3410 } 3411 } 3412 if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) { 3413 scannerClosed = true; 3414 closeScanner(region, scanner, scannerName, context); 3415 } 3416 return builder.build(); 3417 } catch (IOException e) { 3418 try { 3419 // scanner is closed here 3420 scannerClosed = true; 3421 // The scanner state might be left in a dirty state, so we will tell the Client to 3422 // fail this RPC and close the scanner while opening up another one from the start of 3423 // row that the client has last seen. 3424 closeScanner(region, scanner, scannerName, context); 3425 3426 // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is 3427 // used in two different semantics. 3428 // (1) The first is to close the client scanner and bubble up the exception all the way 3429 // to the application. This is preferred when the exception is really un-recoverable 3430 // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this 3431 // bucket usually. 3432 // (2) Second semantics is to close the current region scanner only, but continue the 3433 // client scanner by overriding the exception. This is usually UnknownScannerException, 3434 // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the 3435 // application-level ClientScanner has to continue without bubbling up the exception to 3436 // the client. See ClientScanner code to see how it deals with these special exceptions. 3437 if (e instanceof DoNotRetryIOException) { 3438 throw e; 3439 } 3440 3441 // If it is a FileNotFoundException, wrap as a 3442 // DoNotRetryIOException. This can avoid the retry in ClientScanner. 3443 if (e instanceof FileNotFoundException) { 3444 throw new DoNotRetryIOException(e); 3445 } 3446 3447 // We closed the scanner already. Instead of throwing the IOException, and client 3448 // retrying with the same scannerId only to get USE on the next RPC, we directly throw 3449 // a special exception to save an RPC. 3450 if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) { 3451 // 1.4.0+ clients know how to handle 3452 throw new ScannerResetException("Scanner is closed on the server-side", e); 3453 } else { 3454 // older clients do not know about SRE. Just throw USE, which they will handle 3455 throw new UnknownScannerException("Throwing UnknownScannerException to reset the client" 3456 + " scanner state for clients older than 1.3.", e); 3457 } 3458 } catch (IOException ioe) { 3459 throw new ServiceException(ioe); 3460 } 3461 } finally { 3462 if (!scannerClosed) { 3463 // Adding resets expiration time on lease. 3464 // the closeCallBack will be set in closeScanner so here we only care about shippedCallback 3465 if (context != null) { 3466 context.setCallBack(rsh.shippedCallback); 3467 } else { 3468 // When context != null, adding back the lease will be done in callback set above. 3469 addScannerLeaseBack(lease); 3470 } 3471 } 3472 quota.close(); 3473 } 3474 } 3475 3476 private void closeScanner(HRegion region, RegionScanner scanner, String scannerName, 3477 RpcCallContext context) throws IOException { 3478 if (region.getCoprocessorHost() != null) { 3479 if (region.getCoprocessorHost().preScannerClose(scanner)) { 3480 // bypass the actual close. 3481 return; 3482 } 3483 } 3484 RegionScannerHolder rsh = scanners.remove(scannerName); 3485 if (rsh != null) { 3486 if (context != null) { 3487 context.setCallBack(rsh.closeCallBack); 3488 } else { 3489 rsh.s.close(); 3490 } 3491 if (region.getCoprocessorHost() != null) { 3492 region.getCoprocessorHost().postScannerClose(scanner); 3493 } 3494 closedScanners.put(scannerName, scannerName); 3495 } 3496 } 3497 3498 @Override 3499 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 3500 CoprocessorServiceRequest request) throws ServiceException { 3501 rpcPreCheck("execRegionServerService"); 3502 return regionServer.execRegionServerService(controller, request); 3503 } 3504 3505 @Override 3506 public UpdateConfigurationResponse updateConfiguration( 3507 RpcController controller, UpdateConfigurationRequest request) 3508 throws ServiceException { 3509 try { 3510 this.regionServer.updateConfiguration(); 3511 } catch (Exception e) { 3512 throw new ServiceException(e); 3513 } 3514 return UpdateConfigurationResponse.getDefaultInstance(); 3515 } 3516 3517 @Override 3518 public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots( 3519 RpcController controller, GetSpaceQuotaSnapshotsRequest request) throws ServiceException { 3520 try { 3521 final RegionServerSpaceQuotaManager manager = 3522 regionServer.getRegionServerSpaceQuotaManager(); 3523 final GetSpaceQuotaSnapshotsResponse.Builder builder = 3524 GetSpaceQuotaSnapshotsResponse.newBuilder(); 3525 if (manager != null) { 3526 final Map<TableName,SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots(); 3527 for (Entry<TableName,SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) { 3528 builder.addSnapshots(TableQuotaSnapshot.newBuilder() 3529 .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey())) 3530 .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue())) 3531 .build()); 3532 } 3533 } 3534 return builder.build(); 3535 } catch (Exception e) { 3536 throw new ServiceException(e); 3537 } 3538 } 3539 3540 @Override 3541 public ExecuteProceduresResponse executeProcedures(RpcController controller, 3542 ExecuteProceduresRequest request) throws ServiceException { 3543 ExecuteProceduresResponse.Builder builder = ExecuteProceduresResponse.newBuilder(); 3544 if (request.getOpenRegionCount() > 0) { 3545 for (OpenRegionRequest req : request.getOpenRegionList()) { 3546 builder.addOpenRegion(openRegion(controller, req)); 3547 } 3548 } 3549 if (request.getCloseRegionCount() > 0) { 3550 for (CloseRegionRequest req : request.getCloseRegionList()) { 3551 builder.addCloseRegion(closeRegion(controller, req)); 3552 } 3553 } 3554 return builder.build(); 3555 } 3556 3557 @Override 3558 public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, 3559 ClearRegionBlockCacheRequest request) { 3560 ClearRegionBlockCacheResponse.Builder builder = 3561 ClearRegionBlockCacheResponse.newBuilder(); 3562 CacheEvictionStatsBuilder stats = CacheEvictionStats.builder(); 3563 List<HRegion> regions = getRegions(request.getRegionList(), stats); 3564 for (HRegion region : regions) { 3565 try { 3566 stats = stats.append(this.regionServer.clearRegionBlockCache(region)); 3567 } catch (Exception e) { 3568 stats.addException(region.getRegionInfo().getRegionName(), e); 3569 } 3570 } 3571 stats.withMaxCacheSize(regionServer.getCacheConfig().getBlockCache().getMaxSize()); 3572 return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build(); 3573 } 3574}