001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.TimeUnit;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.ScheduledChore;
028import org.apache.hadoop.hbase.TableDescriptors;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Admin;
031import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
032import org.apache.hadoop.hbase.client.CompactionState;
033import org.apache.hadoop.hbase.client.RegionInfo;
034import org.apache.hadoop.hbase.client.TableDescriptor;
035import org.apache.hadoop.hbase.client.TableState;
036import org.apache.hadoop.hbase.master.HMaster;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * Periodic MOB compaction chore.
045 * <p/>
046 * It runs MOB compaction on region servers in parallel, thus utilizing distributed cluster
047 * resources. To avoid possible major compaction storms, one can specify maximum number regions to
048 * be compacted in parallel by setting configuration parameter: <br>
049 * 'hbase.mob.major.compaction.region.batch.size', which by default is 0 (unlimited).
050 */
051@InterfaceAudience.Private
052public class MobFileCompactionChore extends ScheduledChore {
053
054  private static final Logger LOG = LoggerFactory.getLogger(MobFileCompactionChore.class);
055  private HMaster master;
056  private int regionBatchSize = 0;// not set - compact all
057
058  public MobFileCompactionChore(HMaster master) {
059    super(master.getServerName() + "-MobFileCompactionChore", master,
060      master.getConfiguration().getInt(MobConstants.MOB_COMPACTION_CHORE_PERIOD,
061        MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD),
062      master.getConfiguration().getInt(MobConstants.MOB_COMPACTION_CHORE_PERIOD,
063        MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD),
064      TimeUnit.SECONDS);
065    this.master = master;
066    this.regionBatchSize =
067      master.getConfiguration().getInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE,
068        MobConstants.DEFAULT_MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE);
069
070  }
071
072  public MobFileCompactionChore(Configuration conf, int batchSize) {
073    this.regionBatchSize = batchSize;
074  }
075
076  @Override
077  protected void chore() {
078    boolean reported = false;
079
080    try (Admin admin = master.getConnection().getAdmin()) {
081      TableDescriptors htds = master.getTableDescriptors();
082      Map<String, TableDescriptor> map = htds.getAll();
083      for (TableDescriptor htd : map.values()) {
084        if (
085          !master.getTableStateManager().isTableState(htd.getTableName(), TableState.State.ENABLED)
086        ) {
087          LOG.info("Skipping MOB compaction on table {} because it is not ENABLED",
088            htd.getTableName());
089          continue;
090        } else {
091          LOG.info("Starting MOB compaction on table {}, checking {} column families",
092            htd.getTableName(), htd.getColumnFamilyCount());
093        }
094        for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
095          try {
096            if (hcd.isMobEnabled()) {
097              if (!reported) {
098                master.reportMobCompactionStart(htd.getTableName());
099                reported = true;
100              }
101              LOG.info("Major MOB compacting table={} cf={}", htd.getTableName(),
102                hcd.getNameAsString());
103              if (regionBatchSize == MobConstants.DEFAULT_MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE) {
104                LOG.debug(
105                  "Table={} cf ={}: batch MOB compaction is disabled, {}=0 -"
106                    + " all regions will be compacted in parallel",
107                  htd.getTableName(), hcd.getNameAsString(), "hbase.mob.compaction.batch.size");
108                admin.majorCompact(htd.getTableName(), hcd.getName());
109              } else {
110                LOG.info(
111                  "Table={} cf={}: performing MOB major compaction in batches "
112                    + "'hbase.mob.compaction.batch.size'={}",
113                  htd.getTableName(), hcd.getNameAsString(), regionBatchSize);
114                performMajorCompactionInBatches(admin, htd, hcd);
115              }
116            } else {
117              LOG.debug("Skipping table={} column family={} because it is not MOB-enabled",
118                htd.getTableName(), hcd.getNameAsString());
119            }
120          } catch (IOException e) {
121            LOG.error("Failed to compact table={} cf={}", htd.getTableName(), hcd.getNameAsString(),
122              e);
123          } catch (InterruptedException ee) {
124            Thread.currentThread().interrupt();
125            master.reportMobCompactionEnd(htd.getTableName());
126            LOG.warn("Failed to compact table={} cf={}", htd.getTableName(), hcd.getNameAsString(),
127              ee);
128            // Quit the chore
129            return;
130          }
131        }
132        if (reported) {
133          master.reportMobCompactionEnd(htd.getTableName());
134          reported = false;
135        }
136      }
137    } catch (IOException e) {
138      LOG.error("Failed to compact", e);
139    }
140  }
141
142  public void performMajorCompactionInBatches(Admin admin, TableDescriptor htd,
143    ColumnFamilyDescriptor hcd) throws IOException, InterruptedException {
144
145    List<RegionInfo> regions = admin.getRegions(htd.getTableName());
146    if (regions.size() <= this.regionBatchSize) {
147      LOG.debug(
148        "Table={} cf={} - performing major MOB compaction in non-batched mode,"
149          + "regions={}, batch size={}",
150        htd.getTableName(), hcd.getNameAsString(), regions.size(), regionBatchSize);
151      admin.majorCompact(htd.getTableName(), hcd.getName());
152      return;
153    }
154    // Shuffle list of regions in case if they come ordered by region server
155    Collections.shuffle(regions);
156    // Create first batch
157    List<RegionInfo> toCompact = new ArrayList<RegionInfo>(this.regionBatchSize);
158    for (int i = 0; i < this.regionBatchSize; i++) {
159      toCompact.add(regions.remove(0));
160    }
161
162    // Start compaction now
163    for (RegionInfo ri : toCompact) {
164      startCompaction(admin, htd.getTableName(), ri, hcd.getName());
165    }
166
167    List<RegionInfo> compacted = new ArrayList<RegionInfo>(toCompact.size());
168    List<RegionInfo> failed = new ArrayList<RegionInfo>();
169    int totalCompacted = 0;
170    while (!toCompact.isEmpty()) {
171      // Check status of active compactions
172      for (RegionInfo ri : toCompact) {
173        try {
174          if (admin.getCompactionStateForRegion(ri.getRegionName()) == CompactionState.NONE) {
175            totalCompacted++;
176            LOG.info("Finished major MOB compaction: table={} cf={} region={} compacted regions={}",
177              htd.getTableName(), hcd.getNameAsString(), ri.getRegionNameAsString(),
178              totalCompacted);
179            compacted.add(ri);
180          }
181        } catch (IOException e) {
182          LOG.error(
183            "Could not get compaction state for table={} cf={} region={}, compaction will"
184              + " aborted for the region.",
185            htd.getTableName(), hcd.getNameAsString(), ri.getEncodedName());
186          LOG.error("Because of:", e);
187          failed.add(ri);
188        }
189      }
190      // Remove failed regions to avoid
191      // endless compaction loop
192      toCompact.removeAll(failed);
193      failed.clear();
194      // Update batch: remove compacted regions and add new ones
195      for (RegionInfo ri : compacted) {
196        toCompact.remove(ri);
197        if (regions.size() > 0) {
198          RegionInfo region = regions.remove(0);
199          toCompact.add(region);
200          startCompaction(admin, htd.getTableName(), region, hcd.getName());
201        }
202      }
203      compacted.clear();
204
205      LOG.debug(
206        "Table={}  cf={}. Wait for 10 sec, toCompact size={} regions left={}"
207          + " compacted so far={}",
208        htd.getTableName(), hcd.getNameAsString(), toCompact.size(), regions.size(),
209        totalCompacted);
210      Thread.sleep(10000);
211    }
212    LOG.info("Finished major MOB compacting table={}. cf={}", htd.getTableName(),
213      hcd.getNameAsString());
214
215  }
216
217  private void startCompaction(Admin admin, TableName table, RegionInfo region, byte[] cf)
218    throws IOException, InterruptedException {
219    LOG.info("Started major compaction: table={} cf={} region={}", table, Bytes.toString(cf),
220      region.getRegionNameAsString());
221    admin.majorCompactRegion(region.getRegionName(), cf);
222    // Wait until it really starts
223    // but with finite timeout
224    long waitTime = 300000; // 5 min
225    long startTime = EnvironmentEdgeManager.currentTime();
226    while (admin.getCompactionStateForRegion(region.getRegionName()) == CompactionState.NONE) {
227      // Is 1 second too aggressive?
228      Thread.sleep(1000);
229      if (EnvironmentEdgeManager.currentTime() - startTime > waitTime) {
230        LOG.warn(
231          "Waited for {} ms to start major MOB compaction on table={} cf={} region={}."
232            + " Stopped waiting for request confirmation. This is not an ERROR,"
233            + " continue next region.",
234          waitTime, table.getNameAsString(), Bytes.toString(cf), region.getRegionNameAsString());
235        break;
236      }
237    }
238  }
239}