001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.util.concurrent.LinkedBlockingQueue;
021import java.util.concurrent.ThreadPoolExecutor;
022import java.util.concurrent.TimeUnit;
023import org.apache.hadoop.hbase.client.RegionInfo;
024import org.apache.hadoop.hbase.executor.ExecutorService;
025import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
026import org.apache.hadoop.hbase.executor.ExecutorType;
027import org.apache.hadoop.hbase.io.ByteBuffAllocator;
028import org.apache.hadoop.hbase.wal.WAL;
029import org.apache.yetus.audience.InterfaceAudience;
030
031import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
032
033/**
034 * Services a Store needs from a Region. RegionServicesForStores class is the interface through
035 * which memstore access services at the region level. For example, when using alternative memory
036 * formats or due to compaction the memstore needs to take occasional lock and update size counters
037 * at the region level.
038 */
039@InterfaceAudience.Private
040public class RegionServicesForStores {
041
042  private final HRegion region;
043  private final RegionServerServices rsServices;
044  private int inMemoryPoolSize;
045
046  public RegionServicesForStores(HRegion region, RegionServerServices rsServices) {
047    this.region = region;
048    this.rsServices = rsServices;
049    if (this.rsServices != null) {
050      this.inMemoryPoolSize =
051        rsServices.getConfiguration().getInt(CompactingMemStore.IN_MEMORY_CONPACTION_POOL_SIZE_KEY,
052          CompactingMemStore.IN_MEMORY_CONPACTION_POOL_SIZE_DEFAULT);
053    }
054  }
055
056  public void addMemStoreSize(long dataSizeDelta, long heapSizeDelta, long offHeapSizeDelta,
057    int cellsCountDelta) {
058    region.incMemStoreSize(dataSizeDelta, heapSizeDelta, offHeapSizeDelta, cellsCountDelta);
059  }
060
061  public RegionInfo getRegionInfo() {
062    return region.getRegionInfo();
063  }
064
065  public WAL getWAL() {
066    return region.getWAL();
067  }
068
069  private static ByteBuffAllocator ALLOCATOR_FOR_TEST;
070
071  private static synchronized ByteBuffAllocator getAllocatorForTest() {
072    if (ALLOCATOR_FOR_TEST == null) {
073      ALLOCATOR_FOR_TEST = ByteBuffAllocator.HEAP;
074    }
075    return ALLOCATOR_FOR_TEST;
076  }
077
078  public ByteBuffAllocator getByteBuffAllocator() {
079    if (rsServices != null && rsServices.getRpcServer() != null) {
080      return rsServices.getRpcServer().getByteBuffAllocator();
081    } else {
082      return getAllocatorForTest();
083    }
084  }
085
086  private static ThreadPoolExecutor INMEMORY_COMPACTION_POOL_FOR_TEST;
087
088  private static synchronized ThreadPoolExecutor getInMemoryCompactionPoolForTest() {
089    if (INMEMORY_COMPACTION_POOL_FOR_TEST == null) {
090      INMEMORY_COMPACTION_POOL_FOR_TEST = new ThreadPoolExecutor(10, 10, 60, TimeUnit.SECONDS,
091        new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setDaemon(true)
092          .setNameFormat("InMemoryCompactionsForTest-%d").build());
093    }
094    return INMEMORY_COMPACTION_POOL_FOR_TEST;
095  }
096
097  ThreadPoolExecutor getInMemoryCompactionPool() {
098    if (rsServices != null) {
099      ExecutorService executorService = rsServices.getExecutorService();
100      ExecutorConfig config = executorService.new ExecutorConfig()
101        .setExecutorType(ExecutorType.RS_IN_MEMORY_COMPACTION).setCorePoolSize(inMemoryPoolSize);
102      return executorService.getExecutorLazily(config);
103    } else {
104      // this could only happen in tests
105      return getInMemoryCompactionPoolForTest();
106    }
107  }
108
109  public long getMemStoreFlushSize() {
110    return region.getMemStoreFlushSize();
111  }
112
113  public int getNumStores() {
114    return region.getTableDescriptor().getColumnFamilyCount();
115  }
116
117  long getMemStoreSize() {
118    return region.getMemStoreDataSize();
119  }
120}