001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.master; 020 021import java.io.IOException; 022import java.util.List; 023import java.util.concurrent.ExecutorService; 024import java.util.concurrent.RejectedExecutionException; 025import java.util.concurrent.SynchronousQueue; 026import java.util.concurrent.ThreadFactory; 027import java.util.concurrent.ThreadPoolExecutor; 028import java.util.concurrent.TimeUnit; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 037import org.apache.hadoop.hbase.master.locking.LockManager; 038import org.apache.hadoop.hbase.mob.MobUtils; 039import org.apache.hadoop.hbase.procedure2.LockType; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041 042/** 043 * The mob compaction thread used in {@link MasterRpcServices} 044 */ 045@InterfaceAudience.Private 046public class MasterMobCompactionThread { 047 static final Logger LOG = LoggerFactory.getLogger(MasterMobCompactionThread.class); 048 private final HMaster master; 049 private final Configuration conf; 050 private final ExecutorService mobCompactorPool; 051 private final ExecutorService masterMobPool; 052 053 public MasterMobCompactionThread(HMaster master) { 054 this.master = master; 055 this.conf = master.getConfiguration(); 056 final String n = Thread.currentThread().getName(); 057 // this pool is used to run the mob compaction 058 this.masterMobPool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, 059 new SynchronousQueue<>(), new ThreadFactory() { 060 @Override 061 public Thread newThread(Runnable r) { 062 String name = n + "-MasterMobCompaction-" + EnvironmentEdgeManager.currentTime(); 063 return new Thread(r, name); 064 } 065 }); 066 ((ThreadPoolExecutor) this.masterMobPool).allowCoreThreadTimeOut(true); 067 // this pool is used in the mob compaction to compact the mob files by partitions 068 // in parallel 069 this.mobCompactorPool = MobUtils 070 .createMobCompactorThreadPool(master.getConfiguration()); 071 } 072 073 /** 074 * Requests mob compaction 075 * @param conf The Configuration 076 * @param fs The file system 077 * @param tableName The table the compact 078 * @param columns The column descriptors 079 * @param allFiles Whether add all mob files into the compaction. 080 */ 081 public void requestMobCompaction(Configuration conf, FileSystem fs, TableName tableName, 082 List<ColumnFamilyDescriptor> columns, boolean allFiles) throws IOException { 083 master.reportMobCompactionStart(tableName); 084 try { 085 masterMobPool.execute(new CompactionRunner(fs, tableName, columns, 086 allFiles, mobCompactorPool)); 087 } catch (RejectedExecutionException e) { 088 // in case the request is rejected by the pool 089 try { 090 master.reportMobCompactionEnd(tableName); 091 } catch (IOException e1) { 092 LOG.error("Failed to mark end of mob compaction", e1); 093 } 094 throw e; 095 } 096 if (LOG.isDebugEnabled()) { 097 LOG.debug("The mob compaction is requested for the columns " + columns 098 + " of the table " + tableName.getNameAsString()); 099 } 100 } 101 102 private class CompactionRunner implements Runnable { 103 private FileSystem fs; 104 private TableName tableName; 105 private List<ColumnFamilyDescriptor> hcds; 106 private boolean allFiles; 107 private ExecutorService pool; 108 109 public CompactionRunner(FileSystem fs, TableName tableName, List<ColumnFamilyDescriptor> hcds, 110 boolean allFiles, ExecutorService pool) { 111 super(); 112 this.fs = fs; 113 this.tableName = tableName; 114 this.hcds = hcds; 115 this.allFiles = allFiles; 116 this.pool = pool; 117 } 118 119 @Override 120 public void run() { 121 // These locks are on dummy table names, and only used for compaction/mob file cleaning. 122 final LockManager.MasterLock lock = master.getLockManager().createMasterLock( 123 MobUtils.getTableLockName(tableName), LockType.EXCLUSIVE, 124 this.getClass().getName() + ": mob compaction"); 125 try { 126 for (ColumnFamilyDescriptor hcd : hcds) { 127 MobUtils.doMobCompaction(conf, fs, tableName, hcd, pool, allFiles, lock); 128 } 129 } catch (IOException e) { 130 LOG.error("Failed to perform the mob compaction", e); 131 } finally { 132 try { 133 master.reportMobCompactionEnd(tableName); 134 } catch (IOException e) { 135 LOG.error("Failed to mark end of mob compaction", e); 136 } 137 } 138 } 139 } 140 141 /** 142 * Only interrupt once it's done with a run through the work loop. 143 */ 144 private void interruptIfNecessary() { 145 mobCompactorPool.shutdown(); 146 masterMobPool.shutdown(); 147 } 148 149 /** 150 * Wait for all the threads finish. 151 */ 152 private void join() { 153 waitFor(mobCompactorPool, "Mob Compaction Thread"); 154 waitFor(masterMobPool, "Region Server Mob Compaction Thread"); 155 } 156 157 /** 158 * Closes the MasterMobCompactionThread. 159 */ 160 public void close() { 161 interruptIfNecessary(); 162 join(); 163 } 164 165 /** 166 * Wait for thread finish. 167 * @param t the thread to wait 168 * @param name the thread name. 169 */ 170 private void waitFor(ExecutorService t, String name) { 171 boolean done = false; 172 while (!done) { 173 try { 174 done = t.awaitTermination(60, TimeUnit.SECONDS); 175 LOG.info("Waiting for " + name + " to finish..."); 176 if (!done) { 177 t.shutdownNow(); 178 } 179 } catch (InterruptedException ie) { 180 LOG.warn("Interrupted waiting for " + name + " to finish..."); 181 } 182 } 183 } 184}