001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.List;
025import java.util.Map;
026import java.util.concurrent.TimeUnit;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.ScheduledChore;
029import org.apache.hadoop.hbase.TableDescriptors;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.Admin;
032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
033import org.apache.hadoop.hbase.client.CompactionState;
034import org.apache.hadoop.hbase.client.Connection;
035import org.apache.hadoop.hbase.client.RegionInfo;
036import org.apache.hadoop.hbase.client.TableDescriptor;
037import org.apache.hadoop.hbase.client.TableState;
038import org.apache.hadoop.hbase.master.HMaster;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * Periodic MOB compaction chore. It runs MOB compaction on region servers in parallel, thus
047 * utilizing distributed cluster resources. To avoid possible major compaction storms, one can
048 * specify maximum number regions to be compacted in parallel by setting configuration parameter:
049 * <br>
050 * 'hbase.mob.major.compaction.region.batch.size', which by default is 0 (unlimited).
051 */
052@InterfaceAudience.Private
053public class MobFileCompactionChore extends ScheduledChore {
054
055  private static final Logger LOG = LoggerFactory.getLogger(MobFileCompactionChore.class);
056  private HMaster master;
057  private int regionBatchSize = 0;// not set - compact all
058
059  public MobFileCompactionChore(HMaster master) {
060    super(master.getServerName() + "-MobFileCompactionChore", master,
061      master.getConfiguration().getInt(MobConstants.MOB_COMPACTION_CHORE_PERIOD,
062        MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD),
063      master.getConfiguration().getInt(MobConstants.MOB_COMPACTION_CHORE_PERIOD,
064        MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD),
065      TimeUnit.SECONDS);
066    this.master = master;
067    this.regionBatchSize =
068      master.getConfiguration().getInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE,
069        MobConstants.DEFAULT_MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE);
070
071  }
072
073  @RestrictedApi(explanation = "Should only be called in tests", link = "",
074      allowedOnPath = ".*/src/test/.*")
075  public MobFileCompactionChore(Configuration conf, int batchSize) {
076    this.regionBatchSize = batchSize;
077  }
078
079  @Override
080  protected void chore() {
081
082    boolean reported = false;
083
084    try (Connection conn = master.getConnection(); Admin admin = conn.getAdmin();) {
085
086      TableDescriptors htds = master.getTableDescriptors();
087      Map<String, TableDescriptor> map = htds.getAll();
088      for (TableDescriptor htd : map.values()) {
089        if (
090          !master.getTableStateManager().isTableState(htd.getTableName(), TableState.State.ENABLED)
091        ) {
092          LOG.info("Skipping MOB compaction on table {} because it is not ENABLED",
093            htd.getTableName());
094          continue;
095        } else {
096          LOG.info("Starting MOB compaction on table {}, checking {} column families",
097            htd.getTableName(), htd.getColumnFamilyCount());
098        }
099        for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
100          try {
101            if (hcd.isMobEnabled()) {
102              if (!reported) {
103                master.reportMobCompactionStart(htd.getTableName());
104                reported = true;
105              }
106              LOG.info("Major MOB compacting table={} cf={}", htd.getTableName(),
107                hcd.getNameAsString());
108              if (regionBatchSize == MobConstants.DEFAULT_MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE) {
109                LOG.debug(
110                  "Table={} cf ={}: batch MOB compaction is disabled, {}=0 -"
111                    + " all regions will be compacted in parallel",
112                  htd.getTableName(), hcd.getNameAsString(), "hbase.mob.compaction.batch.size");
113                admin.majorCompact(htd.getTableName(), hcd.getName());
114              } else {
115                LOG.info(
116                  "Table={} cf={}: performing MOB major compaction in batches "
117                    + "'hbase.mob.compaction.batch.size'={}",
118                  htd.getTableName(), hcd.getNameAsString(), regionBatchSize);
119                performMajorCompactionInBatches(admin, htd, hcd);
120              }
121            } else {
122              LOG.debug("Skipping table={} column family={} because it is not MOB-enabled",
123                htd.getTableName(), hcd.getNameAsString());
124            }
125          } catch (IOException e) {
126            LOG.error("Failed to compact table={} cf={}", htd.getTableName(), hcd.getNameAsString(),
127              e);
128          } catch (InterruptedException ee) {
129            Thread.currentThread().interrupt();
130            master.reportMobCompactionEnd(htd.getTableName());
131            LOG.warn("Failed to compact table={} cf={}", htd.getTableName(), hcd.getNameAsString(),
132              ee);
133            // Quit the chore
134            return;
135          }
136        }
137        if (reported) {
138          master.reportMobCompactionEnd(htd.getTableName());
139          reported = false;
140        }
141      }
142    } catch (IOException e) {
143      LOG.error("Failed to compact", e);
144    }
145  }
146
147  @RestrictedApi(explanation = "Should only be called in tests", link = "",
148      allowedOnPath = ".*(/src/test/.*|MobFileCompactionChore).java")
149  public void performMajorCompactionInBatches(Admin admin, TableDescriptor htd,
150    ColumnFamilyDescriptor hcd) throws IOException, InterruptedException {
151
152    List<RegionInfo> regions = admin.getRegions(htd.getTableName());
153    if (regions.size() <= this.regionBatchSize) {
154      LOG.debug(
155        "Table={} cf={} - performing major MOB compaction in non-batched mode,"
156          + "regions={}, batch size={}",
157        htd.getTableName(), hcd.getNameAsString(), regions.size(), regionBatchSize);
158      admin.majorCompact(htd.getTableName(), hcd.getName());
159      return;
160    }
161    // Shuffle list of regions in case if they come ordered by region server
162    Collections.shuffle(regions);
163    // Create first batch
164    List<RegionInfo> toCompact = new ArrayList<RegionInfo>(this.regionBatchSize);
165    for (int i = 0; i < this.regionBatchSize; i++) {
166      toCompact.add(regions.remove(0));
167    }
168
169    // Start compaction now
170    for (RegionInfo ri : toCompact) {
171      startCompaction(admin, htd.getTableName(), ri, hcd.getName());
172    }
173
174    List<RegionInfo> compacted = new ArrayList<RegionInfo>(toCompact.size());
175    List<RegionInfo> failed = new ArrayList<RegionInfo>();
176    int totalCompacted = 0;
177    while (!toCompact.isEmpty()) {
178      // Check status of active compactions
179      for (RegionInfo ri : toCompact) {
180        try {
181          if (admin.getCompactionStateForRegion(ri.getRegionName()) == CompactionState.NONE) {
182            totalCompacted++;
183            LOG.info("Finished major MOB compaction: table={} cf={} region={} compacted regions={}",
184              htd.getTableName(), hcd.getNameAsString(), ri.getRegionNameAsString(),
185              totalCompacted);
186            compacted.add(ri);
187          }
188        } catch (IOException e) {
189          LOG.error(
190            "Could not get compaction state for table={} cf={} region={}, compaction will"
191              + " aborted for the region.",
192            htd.getTableName(), hcd.getNameAsString(), ri.getEncodedName());
193          LOG.error("Because of:", e);
194          failed.add(ri);
195        }
196      }
197      // Remove failed regions to avoid
198      // endless compaction loop
199      toCompact.removeAll(failed);
200      failed.clear();
201      // Update batch: remove compacted regions and add new ones
202      for (RegionInfo ri : compacted) {
203        toCompact.remove(ri);
204        if (regions.size() > 0) {
205          RegionInfo region = regions.remove(0);
206          toCompact.add(region);
207          startCompaction(admin, htd.getTableName(), region, hcd.getName());
208        }
209      }
210      compacted.clear();
211
212      LOG.debug(
213        "Table={}  cf={}. Wait for 10 sec, toCompact size={} regions left={}"
214          + " compacted so far={}",
215        htd.getTableName(), hcd.getNameAsString(), toCompact.size(), regions.size(),
216        totalCompacted);
217      Thread.sleep(10000);
218    }
219    LOG.info("Finished major MOB compacting table={}. cf={}", htd.getTableName(),
220      hcd.getNameAsString());
221
222  }
223
224  private void startCompaction(Admin admin, TableName table, RegionInfo region, byte[] cf)
225    throws IOException, InterruptedException {
226
227    LOG.info("Started major compaction: table={} cf={} region={}", table, Bytes.toString(cf),
228      region.getRegionNameAsString());
229    admin.majorCompactRegion(region.getRegionName(), cf);
230    // Wait until it really starts
231    // but with finite timeout
232    long waitTime = 300000; // 5 min
233    long startTime = EnvironmentEdgeManager.currentTime();
234    while (admin.getCompactionStateForRegion(region.getRegionName()) == CompactionState.NONE) {
235      // Is 1 second too aggressive?
236      Thread.sleep(1000);
237      if (EnvironmentEdgeManager.currentTime() - startTime > waitTime) {
238        LOG.warn("Waited for {} ms to start major MOB compaction on table={} cf={} region={}."
239          + " Stopped waiting for request confirmation. This is not an ERROR, continue next region.",
240          waitTime, table.getNameAsString(), Bytes.toString(cf), region.getRegionNameAsString());
241        break;
242      }
243    }
244  }
245}