001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.util.concurrent.LinkedBlockingQueue; 021import java.util.concurrent.ThreadPoolExecutor; 022import java.util.concurrent.TimeUnit; 023import org.apache.hadoop.hbase.client.RegionInfo; 024import org.apache.hadoop.hbase.executor.ExecutorService; 025import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig; 026import org.apache.hadoop.hbase.executor.ExecutorType; 027import org.apache.hadoop.hbase.io.ByteBuffAllocator; 028import org.apache.hadoop.hbase.keymeta.ManagedKeyDataCache; 029import org.apache.hadoop.hbase.keymeta.SystemKeyCache; 030import org.apache.hadoop.hbase.wal.WAL; 031import org.apache.yetus.audience.InterfaceAudience; 032 033import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 034 035/** 036 * Services a Store needs from a Region. RegionServicesForStores class is the interface through 037 * which memstore access services at the region level. For example, when using alternative memory 038 * formats or due to compaction the memstore needs to take occasional lock and update size counters 039 * at the region level. 040 */ 041@InterfaceAudience.Private 042public class RegionServicesForStores { 043 044 private final HRegion region; 045 private final RegionServerServices rsServices; 046 private int inMemoryPoolSize; 047 048 public RegionServicesForStores(HRegion region, RegionServerServices rsServices) { 049 this.region = region; 050 this.rsServices = rsServices; 051 if (this.rsServices != null) { 052 this.inMemoryPoolSize = 053 rsServices.getConfiguration().getInt(CompactingMemStore.IN_MEMORY_CONPACTION_POOL_SIZE_KEY, 054 CompactingMemStore.IN_MEMORY_CONPACTION_POOL_SIZE_DEFAULT); 055 } 056 } 057 058 public void addMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta, 059 int cellsCountDelta) { 060 region.incMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta, cellsCountDelta); 061 } 062 063 public RegionInfo getRegionInfo() { 064 return region.getRegionInfo(); 065 } 066 067 public WAL getWAL() { 068 return region.getWAL(); 069 } 070 071 private static ByteBuffAllocator ALLOCATOR_FOR_TEST; 072 073 private static synchronized ByteBuffAllocator getAllocatorForTest() { 074 if (ALLOCATOR_FOR_TEST == null) { 075 ALLOCATOR_FOR_TEST = ByteBuffAllocator.HEAP; 076 } 077 return ALLOCATOR_FOR_TEST; 078 } 079 080 public ByteBuffAllocator getByteBuffAllocator() { 081 if (rsServices != null && rsServices.getRpcServer() != null) { 082 return rsServices.getRpcServer().getByteBuffAllocator(); 083 } else { 084 return getAllocatorForTest(); 085 } 086 } 087 088 private static ThreadPoolExecutor INMEMORY_COMPACTION_POOL_FOR_TEST; 089 090 private static synchronized ThreadPoolExecutor getInMemoryCompactionPoolForTest() { 091 if (INMEMORY_COMPACTION_POOL_FOR_TEST == null) { 092 INMEMORY_COMPACTION_POOL_FOR_TEST = new ThreadPoolExecutor(10, 10, 60, TimeUnit.SECONDS, 093 new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setDaemon(true) 094 .setNameFormat("InMemoryCompactionsForTest-%d").build()); 095 } 096 return INMEMORY_COMPACTION_POOL_FOR_TEST; 097 } 098 099 ThreadPoolExecutor getInMemoryCompactionPool() { 100 if (rsServices != null) { 101 ExecutorService executorService = rsServices.getExecutorService(); 102 ExecutorConfig config = executorService.new ExecutorConfig() 103 .setExecutorType(ExecutorType.RS_IN_MEMORY_COMPACTION).setCorePoolSize(inMemoryPoolSize); 104 return executorService.getExecutorLazily(config); 105 } else { 106 // this could only happen in tests 107 return getInMemoryCompactionPoolForTest(); 108 } 109 } 110 111 public long getMemStoreFlushSize() { 112 return region.getMemStoreFlushSize(); 113 } 114 115 public int getNumStores() { 116 return region.getTableDescriptor().getColumnFamilyCount(); 117 } 118 119 long getMemStoreSize() { 120 return region.getMemStoreDataSize(); 121 } 122 123 public ManagedKeyDataCache getManagedKeyDataCache() { 124 return rsServices.getManagedKeyDataCache(); 125 } 126 127 public SystemKeyCache getSystemKeyCache() { 128 return rsServices.getSystemKeyCache(); 129 } 130}