001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.util.concurrent.LinkedBlockingQueue;
021import java.util.concurrent.ThreadPoolExecutor;
022import java.util.concurrent.TimeUnit;
023import org.apache.hadoop.hbase.client.RegionInfo;
024import org.apache.hadoop.hbase.executor.ExecutorService;
025import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
026import org.apache.hadoop.hbase.executor.ExecutorType;
027import org.apache.hadoop.hbase.io.ByteBuffAllocator;
028import org.apache.hadoop.hbase.keymeta.ManagedKeyDataCache;
029import org.apache.hadoop.hbase.keymeta.SystemKeyCache;
030import org.apache.hadoop.hbase.wal.WAL;
031import org.apache.yetus.audience.InterfaceAudience;
032
033import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
034
035/**
036 * Services a Store needs from a Region. RegionServicesForStores class is the interface through
037 * which memstore access services at the region level. For example, when using alternative memory
038 * formats or due to compaction the memstore needs to take occasional lock and update size counters
039 * at the region level.
040 */
041@InterfaceAudience.Private
042public class RegionServicesForStores {
043
044  private final HRegion region;
045  private final RegionServerServices rsServices;
046  private int inMemoryPoolSize;
047
048  public RegionServicesForStores(HRegion region, RegionServerServices rsServices) {
049    this.region = region;
050    this.rsServices = rsServices;
051    if (this.rsServices != null) {
052      this.inMemoryPoolSize =
053        rsServices.getConfiguration().getInt(CompactingMemStore.IN_MEMORY_CONPACTION_POOL_SIZE_KEY,
054          CompactingMemStore.IN_MEMORY_CONPACTION_POOL_SIZE_DEFAULT);
055    }
056  }
057
058  public void addMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta,
059    int cellsCountDelta) {
060    region.incMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta, cellsCountDelta);
061  }
062
063  public RegionInfo getRegionInfo() {
064    return region.getRegionInfo();
065  }
066
067  public WAL getWAL() {
068    return region.getWAL();
069  }
070
071  private static ByteBuffAllocator ALLOCATOR_FOR_TEST;
072
073  private static synchronized ByteBuffAllocator getAllocatorForTest() {
074    if (ALLOCATOR_FOR_TEST == null) {
075      ALLOCATOR_FOR_TEST = ByteBuffAllocator.HEAP;
076    }
077    return ALLOCATOR_FOR_TEST;
078  }
079
080  public ByteBuffAllocator getByteBuffAllocator() {
081    if (rsServices != null && rsServices.getRpcServer() != null) {
082      return rsServices.getRpcServer().getByteBuffAllocator();
083    } else {
084      return getAllocatorForTest();
085    }
086  }
087
088  private static ThreadPoolExecutor INMEMORY_COMPACTION_POOL_FOR_TEST;
089
090  private static synchronized ThreadPoolExecutor getInMemoryCompactionPoolForTest() {
091    if (INMEMORY_COMPACTION_POOL_FOR_TEST == null) {
092      INMEMORY_COMPACTION_POOL_FOR_TEST = new ThreadPoolExecutor(10, 10, 60, TimeUnit.SECONDS,
093        new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setDaemon(true)
094          .setNameFormat("InMemoryCompactionsForTest-%d").build());
095    }
096    return INMEMORY_COMPACTION_POOL_FOR_TEST;
097  }
098
099  ThreadPoolExecutor getInMemoryCompactionPool() {
100    if (rsServices != null) {
101      ExecutorService executorService = rsServices.getExecutorService();
102      ExecutorConfig config = executorService.new ExecutorConfig()
103        .setExecutorType(ExecutorType.RS_IN_MEMORY_COMPACTION).setCorePoolSize(inMemoryPoolSize);
104      return executorService.getExecutorLazily(config);
105    } else {
106      // this could only happen in tests
107      return getInMemoryCompactionPoolForTest();
108    }
109  }
110
111  public long getMemStoreFlushSize() {
112    return region.getMemStoreFlushSize();
113  }
114
115  public int getNumStores() {
116    return region.getTableDescriptor().getColumnFamilyCount();
117  }
118
119  long getMemStoreSize() {
120    return region.getMemStoreDataSize();
121  }
122
123  public ManagedKeyDataCache getManagedKeyDataCache() {
124    return rsServices.getManagedKeyDataCache();
125  }
126
127  public SystemKeyCache getSystemKeyCache() {
128    return rsServices.getSystemKeyCache();
129  }
130}