001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.master; 020 021import java.io.IOException; 022import java.util.List; 023import java.util.concurrent.ExecutorService; 024import java.util.concurrent.RejectedExecutionException; 025import java.util.concurrent.SynchronousQueue; 026import java.util.concurrent.ThreadPoolExecutor; 027import java.util.concurrent.TimeUnit; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 033import org.apache.hadoop.hbase.master.locking.LockManager; 034import org.apache.hadoop.hbase.mob.MobUtils; 035import org.apache.hadoop.hbase.procedure2.LockType; 036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 041 042/** 043 * The mob compaction thread used in {@link MasterRpcServices} 044 */ 045@InterfaceAudience.Private 046public class MasterMobCompactionThread { 047 static final Logger LOG = LoggerFactory.getLogger(MasterMobCompactionThread.class); 048 private final HMaster master; 049 private final Configuration conf; 050 private final ExecutorService mobCompactorPool; 051 private final ExecutorService masterMobPool; 052 053 public MasterMobCompactionThread(HMaster master) { 054 this.master = master; 055 this.conf = master.getConfiguration(); 056 final String n = Thread.currentThread().getName(); 057 // this pool is used to run the mob compaction 058 this.masterMobPool = new ThreadPoolExecutor(1, 2, 60, 059 TimeUnit.SECONDS, new SynchronousQueue<>(), 060 new ThreadFactoryBuilder().setDaemon(true) 061 .setNameFormat(n + "-MasterMobCompaction-" + EnvironmentEdgeManager.currentTime()) 062 .build()); 063 ((ThreadPoolExecutor) this.masterMobPool).allowCoreThreadTimeOut(true); 064 // this pool is used in the mob compaction to compact the mob files by partitions 065 // in parallel 066 this.mobCompactorPool = MobUtils 067 .createMobCompactorThreadPool(master.getConfiguration()); 068 } 069 070 /** 071 * Requests mob compaction 072 * @param conf The Configuration 073 * @param fs The file system 074 * @param tableName The table the compact 075 * @param columns The column descriptors 076 * @param allFiles Whether add all mob files into the compaction. 077 */ 078 public void requestMobCompaction(Configuration conf, FileSystem fs, TableName tableName, 079 List<ColumnFamilyDescriptor> columns, boolean allFiles) throws IOException { 080 master.reportMobCompactionStart(tableName); 081 try { 082 masterMobPool.execute(new CompactionRunner(fs, tableName, columns, 083 allFiles, mobCompactorPool)); 084 } catch (RejectedExecutionException e) { 085 // in case the request is rejected by the pool 086 try { 087 master.reportMobCompactionEnd(tableName); 088 } catch (IOException e1) { 089 LOG.error("Failed to mark end of mob compaction", e1); 090 } 091 throw e; 092 } 093 if (LOG.isDebugEnabled()) { 094 LOG.debug("The mob compaction is requested for the columns " + columns 095 + " of the table " + tableName.getNameAsString()); 096 } 097 } 098 099 private class CompactionRunner implements Runnable { 100 private FileSystem fs; 101 private TableName tableName; 102 private List<ColumnFamilyDescriptor> hcds; 103 private boolean allFiles; 104 private ExecutorService pool; 105 106 public CompactionRunner(FileSystem fs, TableName tableName, List<ColumnFamilyDescriptor> hcds, 107 boolean allFiles, ExecutorService pool) { 108 super(); 109 this.fs = fs; 110 this.tableName = tableName; 111 this.hcds = hcds; 112 this.allFiles = allFiles; 113 this.pool = pool; 114 } 115 116 @Override 117 public void run() { 118 // These locks are on dummy table names, and only used for compaction/mob file cleaning. 119 final LockManager.MasterLock lock = master.getLockManager().createMasterLock( 120 MobUtils.getTableLockName(tableName), LockType.EXCLUSIVE, 121 this.getClass().getName() + ": mob compaction"); 122 try { 123 for (ColumnFamilyDescriptor hcd : hcds) { 124 MobUtils.doMobCompaction(conf, fs, tableName, hcd, pool, allFiles, lock); 125 } 126 } catch (IOException e) { 127 LOG.error("Failed to perform the mob compaction", e); 128 } finally { 129 try { 130 master.reportMobCompactionEnd(tableName); 131 } catch (IOException e) { 132 LOG.error("Failed to mark end of mob compaction", e); 133 } 134 } 135 } 136 } 137 138 /** 139 * Only interrupt once it's done with a run through the work loop. 140 */ 141 private void interruptIfNecessary() { 142 mobCompactorPool.shutdown(); 143 masterMobPool.shutdown(); 144 } 145 146 /** 147 * Wait for all the threads finish. 148 */ 149 private void join() { 150 waitFor(mobCompactorPool, "Mob Compaction Thread"); 151 waitFor(masterMobPool, "Region Server Mob Compaction Thread"); 152 } 153 154 /** 155 * Closes the MasterMobCompactionThread. 156 */ 157 public void close() { 158 interruptIfNecessary(); 159 join(); 160 } 161 162 /** 163 * Wait for thread finish. 164 * @param t the thread to wait 165 * @param name the thread name. 166 */ 167 private void waitFor(ExecutorService t, String name) { 168 boolean done = false; 169 while (!done) { 170 try { 171 done = t.awaitTermination(60, TimeUnit.SECONDS); 172 LOG.info("Waiting for " + name + " to finish..."); 173 if (!done) { 174 t.shutdownNow(); 175 } 176 } catch (InterruptedException ie) { 177 LOG.warn("Interrupted waiting for " + name + " to finish..."); 178 } 179 } 180 } 181}