001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master;
020
021import java.io.IOException;
022import java.util.List;
023import java.util.concurrent.ExecutorService;
024import java.util.concurrent.RejectedExecutionException;
025import java.util.concurrent.SynchronousQueue;
026import java.util.concurrent.ThreadPoolExecutor;
027import java.util.concurrent.TimeUnit;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
033import org.apache.hadoop.hbase.master.locking.LockManager;
034import org.apache.hadoop.hbase.mob.MobUtils;
035import org.apache.hadoop.hbase.procedure2.LockType;
036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
041
042/**
043 * The mob compaction thread used in {@link MasterRpcServices}
044 */
045@InterfaceAudience.Private
046public class MasterMobCompactionThread {
047  static final Logger LOG = LoggerFactory.getLogger(MasterMobCompactionThread.class);
048  private final HMaster master;
049  private final Configuration conf;
050  private final ExecutorService mobCompactorPool;
051  private final ExecutorService masterMobPool;
052
053  public MasterMobCompactionThread(HMaster master) {
054    this.master = master;
055    this.conf = master.getConfiguration();
056    final String n = Thread.currentThread().getName();
057    // this pool is used to run the mob compaction
058    this.masterMobPool = new ThreadPoolExecutor(1, 2, 60,
059        TimeUnit.SECONDS, new SynchronousQueue<>(),
060        new ThreadFactoryBuilder().setDaemon(true)
061            .setNameFormat(n + "-MasterMobCompaction-" + EnvironmentEdgeManager.currentTime())
062            .build());
063    ((ThreadPoolExecutor) this.masterMobPool).allowCoreThreadTimeOut(true);
064    // this pool is used in the mob compaction to compact the mob files by partitions
065    // in parallel
066    this.mobCompactorPool = MobUtils
067      .createMobCompactorThreadPool(master.getConfiguration());
068  }
069
070  /**
071   * Requests mob compaction
072   * @param conf The Configuration
073   * @param fs The file system
074   * @param tableName The table the compact
075   * @param columns The column descriptors
076   * @param allFiles Whether add all mob files into the compaction.
077   */
078  public void requestMobCompaction(Configuration conf, FileSystem fs, TableName tableName,
079                                   List<ColumnFamilyDescriptor> columns, boolean allFiles) throws IOException {
080    master.reportMobCompactionStart(tableName);
081    try {
082      masterMobPool.execute(new CompactionRunner(fs, tableName, columns,
083        allFiles, mobCompactorPool));
084    } catch (RejectedExecutionException e) {
085      // in case the request is rejected by the pool
086      try {
087        master.reportMobCompactionEnd(tableName);
088      } catch (IOException e1) {
089        LOG.error("Failed to mark end of mob compaction", e1);
090      }
091      throw e;
092    }
093    if (LOG.isDebugEnabled()) {
094      LOG.debug("The mob compaction is requested for the columns " + columns
095        + " of the table " + tableName.getNameAsString());
096    }
097  }
098
099  private class CompactionRunner implements Runnable {
100    private FileSystem fs;
101    private TableName tableName;
102    private List<ColumnFamilyDescriptor> hcds;
103    private boolean allFiles;
104    private ExecutorService pool;
105
106    public CompactionRunner(FileSystem fs, TableName tableName, List<ColumnFamilyDescriptor> hcds,
107      boolean allFiles, ExecutorService pool) {
108      super();
109      this.fs = fs;
110      this.tableName = tableName;
111      this.hcds = hcds;
112      this.allFiles = allFiles;
113      this.pool = pool;
114    }
115
116    @Override
117    public void run() {
118      // These locks are on dummy table names, and only used for compaction/mob file cleaning.
119      final LockManager.MasterLock lock = master.getLockManager().createMasterLock(
120          MobUtils.getTableLockName(tableName), LockType.EXCLUSIVE,
121          this.getClass().getName() + ": mob compaction");
122      try {
123        for (ColumnFamilyDescriptor hcd : hcds) {
124          MobUtils.doMobCompaction(conf, fs, tableName, hcd, pool, allFiles, lock);
125        }
126      } catch (IOException e) {
127        LOG.error("Failed to perform the mob compaction", e);
128      } finally {
129        try {
130          master.reportMobCompactionEnd(tableName);
131        } catch (IOException e) {
132          LOG.error("Failed to mark end of mob compaction", e);
133        }
134      }
135    }
136  }
137
138  /**
139   * Only interrupt once it's done with a run through the work loop.
140   */
141  private void interruptIfNecessary() {
142    mobCompactorPool.shutdown();
143    masterMobPool.shutdown();
144  }
145
146  /**
147   * Wait for all the threads finish.
148   */
149  private void join() {
150    waitFor(mobCompactorPool, "Mob Compaction Thread");
151    waitFor(masterMobPool, "Region Server Mob Compaction Thread");
152  }
153
154  /**
155   * Closes the MasterMobCompactionThread.
156   */
157  public void close() {
158    interruptIfNecessary();
159    join();
160  }
161
162  /**
163   * Wait for thread finish.
164   * @param t the thread to wait
165   * @param name the thread name.
166   */
167  private void waitFor(ExecutorService t, String name) {
168    boolean done = false;
169    while (!done) {
170      try {
171        done = t.awaitTermination(60, TimeUnit.SECONDS);
172        LOG.info("Waiting for " + name + " to finish...");
173        if (!done) {
174          t.shutdownNow();
175        }
176      } catch (InterruptedException ie) {
177        LOG.warn("Interrupted waiting for " + name + " to finish...");
178      }
179    }
180  }
181}