001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master;
020
021import java.io.IOException;
022import java.util.List;
023import java.util.concurrent.ExecutorService;
024import java.util.concurrent.RejectedExecutionException;
025import java.util.concurrent.SynchronousQueue;
026import java.util.concurrent.ThreadFactory;
027import java.util.concurrent.ThreadPoolExecutor;
028import java.util.concurrent.TimeUnit;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
037import org.apache.hadoop.hbase.master.locking.LockManager;
038import org.apache.hadoop.hbase.mob.MobUtils;
039import org.apache.hadoop.hbase.procedure2.LockType;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041
042/**
043 * The mob compaction thread used in {@link MasterRpcServices}
044 */
045@InterfaceAudience.Private
046public class MasterMobCompactionThread {
047  static final Logger LOG = LoggerFactory.getLogger(MasterMobCompactionThread.class);
048  private final HMaster master;
049  private final Configuration conf;
050  private final ExecutorService mobCompactorPool;
051  private final ExecutorService masterMobPool;
052
053  public MasterMobCompactionThread(HMaster master) {
054    this.master = master;
055    this.conf = master.getConfiguration();
056    final String n = Thread.currentThread().getName();
057    // this pool is used to run the mob compaction
058    this.masterMobPool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,
059      new SynchronousQueue<>(), new ThreadFactory() {
060        @Override
061        public Thread newThread(Runnable r) {
062          String name = n + "-MasterMobCompaction-" + EnvironmentEdgeManager.currentTime();
063          return new Thread(r, name);
064        }
065      });
066    ((ThreadPoolExecutor) this.masterMobPool).allowCoreThreadTimeOut(true);
067    // this pool is used in the mob compaction to compact the mob files by partitions
068    // in parallel
069    this.mobCompactorPool = MobUtils
070      .createMobCompactorThreadPool(master.getConfiguration());
071  }
072
073  /**
074   * Requests mob compaction
075   * @param conf The Configuration
076   * @param fs The file system
077   * @param tableName The table the compact
078   * @param columns The column descriptors
079   * @param allFiles Whether add all mob files into the compaction.
080   */
081  public void requestMobCompaction(Configuration conf, FileSystem fs, TableName tableName,
082                                   List<ColumnFamilyDescriptor> columns, boolean allFiles) throws IOException {
083    master.reportMobCompactionStart(tableName);
084    try {
085      masterMobPool.execute(new CompactionRunner(fs, tableName, columns,
086        allFiles, mobCompactorPool));
087    } catch (RejectedExecutionException e) {
088      // in case the request is rejected by the pool
089      try {
090        master.reportMobCompactionEnd(tableName);
091      } catch (IOException e1) {
092        LOG.error("Failed to mark end of mob compaction", e1);
093      }
094      throw e;
095    }
096    if (LOG.isDebugEnabled()) {
097      LOG.debug("The mob compaction is requested for the columns " + columns
098        + " of the table " + tableName.getNameAsString());
099    }
100  }
101
102  private class CompactionRunner implements Runnable {
103    private FileSystem fs;
104    private TableName tableName;
105    private List<ColumnFamilyDescriptor> hcds;
106    private boolean allFiles;
107    private ExecutorService pool;
108
109    public CompactionRunner(FileSystem fs, TableName tableName, List<ColumnFamilyDescriptor> hcds,
110      boolean allFiles, ExecutorService pool) {
111      super();
112      this.fs = fs;
113      this.tableName = tableName;
114      this.hcds = hcds;
115      this.allFiles = allFiles;
116      this.pool = pool;
117    }
118
119    @Override
120    public void run() {
121      // These locks are on dummy table names, and only used for compaction/mob file cleaning.
122      final LockManager.MasterLock lock = master.getLockManager().createMasterLock(
123          MobUtils.getTableLockName(tableName), LockType.EXCLUSIVE,
124          this.getClass().getName() + ": mob compaction");
125      try {
126        for (ColumnFamilyDescriptor hcd : hcds) {
127          MobUtils.doMobCompaction(conf, fs, tableName, hcd, pool, allFiles, lock);
128        }
129      } catch (IOException e) {
130        LOG.error("Failed to perform the mob compaction", e);
131      } finally {
132        try {
133          master.reportMobCompactionEnd(tableName);
134        } catch (IOException e) {
135          LOG.error("Failed to mark end of mob compaction", e);
136        }
137      }
138    }
139  }
140
141  /**
142   * Only interrupt once it's done with a run through the work loop.
143   */
144  private void interruptIfNecessary() {
145    mobCompactorPool.shutdown();
146    masterMobPool.shutdown();
147  }
148
149  /**
150   * Wait for all the threads finish.
151   */
152  private void join() {
153    waitFor(mobCompactorPool, "Mob Compaction Thread");
154    waitFor(masterMobPool, "Region Server Mob Compaction Thread");
155  }
156
157  /**
158   * Closes the MasterMobCompactionThread.
159   */
160  public void close() {
161    interruptIfNecessary();
162    join();
163  }
164
165  /**
166   * Wait for thread finish.
167   * @param t the thread to wait
168   * @param name the thread name.
169   */
170  private void waitFor(ExecutorService t, String name) {
171    boolean done = false;
172    while (!done) {
173      try {
174        done = t.awaitTermination(60, TimeUnit.SECONDS);
175        LOG.info("Waiting for " + name + " to finish...");
176        if (!done) {
177          t.shutdownNow();
178        }
179      } catch (InterruptedException ie) {
180        LOG.warn("Interrupted waiting for " + name + " to finish...");
181      }
182    }
183  }
184}