001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.List; 025import java.util.Map; 026import java.util.concurrent.TimeUnit; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.ScheduledChore; 029import org.apache.hadoop.hbase.TableDescriptors; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 033import org.apache.hadoop.hbase.client.CompactionState; 034import org.apache.hadoop.hbase.client.Connection; 035import org.apache.hadoop.hbase.client.RegionInfo; 036import org.apache.hadoop.hbase.client.TableDescriptor; 037import org.apache.hadoop.hbase.client.TableState; 038import org.apache.hadoop.hbase.master.HMaster; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045/** 046 * Periodic MOB compaction chore. It runs MOB compaction on region servers in parallel, thus 047 * utilizing distributed cluster resources. To avoid possible major compaction storms, one can 048 * specify maximum number regions to be compacted in parallel by setting configuration parameter: 049 * <br> 050 * 'hbase.mob.major.compaction.region.batch.size', which by default is 0 (unlimited). 051 */ 052@InterfaceAudience.Private 053public class MobFileCompactionChore extends ScheduledChore { 054 055 private static final Logger LOG = LoggerFactory.getLogger(MobFileCompactionChore.class); 056 private HMaster master; 057 private int regionBatchSize = 0;// not set - compact all 058 059 public MobFileCompactionChore(HMaster master) { 060 super(master.getServerName() + "-MobFileCompactionChore", master, 061 master.getConfiguration().getInt(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 062 MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD), 063 master.getConfiguration().getInt(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 064 MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD), 065 TimeUnit.SECONDS); 066 this.master = master; 067 this.regionBatchSize = 068 master.getConfiguration().getInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, 069 MobConstants.DEFAULT_MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE); 070 071 } 072 073 @RestrictedApi(explanation = "Should only be called in tests", link = "", 074 allowedOnPath = ".*/src/test/.*") 075 public MobFileCompactionChore(Configuration conf, int batchSize) { 076 this.regionBatchSize = batchSize; 077 } 078 079 @Override 080 protected void chore() { 081 082 boolean reported = false; 083 084 try (Connection conn = master.getConnection(); Admin admin = conn.getAdmin();) { 085 086 TableDescriptors htds = master.getTableDescriptors(); 087 Map<String, TableDescriptor> map = htds.getAll(); 088 for (TableDescriptor htd : map.values()) { 089 if ( 090 !master.getTableStateManager().isTableState(htd.getTableName(), TableState.State.ENABLED) 091 ) { 092 LOG.info("Skipping MOB compaction on table {} because it is not ENABLED", 093 htd.getTableName()); 094 continue; 095 } else { 096 LOG.info("Starting MOB compaction on table {}, checking {} column families", 097 htd.getTableName(), htd.getColumnFamilyCount()); 098 } 099 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 100 try { 101 if (hcd.isMobEnabled()) { 102 if (!reported) { 103 master.reportMobCompactionStart(htd.getTableName()); 104 reported = true; 105 } 106 LOG.info("Major MOB compacting table={} cf={}", htd.getTableName(), 107 hcd.getNameAsString()); 108 if (regionBatchSize == MobConstants.DEFAULT_MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE) { 109 LOG.debug( 110 "Table={} cf ={}: batch MOB compaction is disabled, {}=0 -" 111 + " all regions will be compacted in parallel", 112 htd.getTableName(), hcd.getNameAsString(), "hbase.mob.compaction.batch.size"); 113 admin.majorCompact(htd.getTableName(), hcd.getName()); 114 } else { 115 LOG.info( 116 "Table={} cf={}: performing MOB major compaction in batches " 117 + "'hbase.mob.compaction.batch.size'={}", 118 htd.getTableName(), hcd.getNameAsString(), regionBatchSize); 119 performMajorCompactionInBatches(admin, htd, hcd); 120 } 121 } else { 122 LOG.debug("Skipping table={} column family={} because it is not MOB-enabled", 123 htd.getTableName(), hcd.getNameAsString()); 124 } 125 } catch (IOException e) { 126 LOG.error("Failed to compact table={} cf={}", htd.getTableName(), hcd.getNameAsString(), 127 e); 128 } catch (InterruptedException ee) { 129 Thread.currentThread().interrupt(); 130 master.reportMobCompactionEnd(htd.getTableName()); 131 LOG.warn("Failed to compact table={} cf={}", htd.getTableName(), hcd.getNameAsString(), 132 ee); 133 // Quit the chore 134 return; 135 } 136 } 137 if (reported) { 138 master.reportMobCompactionEnd(htd.getTableName()); 139 reported = false; 140 } 141 } 142 } catch (IOException e) { 143 LOG.error("Failed to compact", e); 144 } 145 } 146 147 @RestrictedApi(explanation = "Should only be called in tests", link = "", 148 allowedOnPath = ".*(/src/test/.*|MobFileCompactionChore).java") 149 public void performMajorCompactionInBatches(Admin admin, TableDescriptor htd, 150 ColumnFamilyDescriptor hcd) throws IOException, InterruptedException { 151 152 List<RegionInfo> regions = admin.getRegions(htd.getTableName()); 153 if (regions.size() <= this.regionBatchSize) { 154 LOG.debug( 155 "Table={} cf={} - performing major MOB compaction in non-batched mode," 156 + "regions={}, batch size={}", 157 htd.getTableName(), hcd.getNameAsString(), regions.size(), regionBatchSize); 158 admin.majorCompact(htd.getTableName(), hcd.getName()); 159 return; 160 } 161 // Shuffle list of regions in case if they come ordered by region server 162 Collections.shuffle(regions); 163 // Create first batch 164 List<RegionInfo> toCompact = new ArrayList<RegionInfo>(this.regionBatchSize); 165 for (int i = 0; i < this.regionBatchSize; i++) { 166 toCompact.add(regions.remove(0)); 167 } 168 169 // Start compaction now 170 for (RegionInfo ri : toCompact) { 171 startCompaction(admin, htd.getTableName(), ri, hcd.getName()); 172 } 173 174 List<RegionInfo> compacted = new ArrayList<RegionInfo>(toCompact.size()); 175 List<RegionInfo> failed = new ArrayList<RegionInfo>(); 176 int totalCompacted = 0; 177 while (!toCompact.isEmpty()) { 178 // Check status of active compactions 179 for (RegionInfo ri : toCompact) { 180 try { 181 if (admin.getCompactionStateForRegion(ri.getRegionName()) == CompactionState.NONE) { 182 totalCompacted++; 183 LOG.info("Finished major MOB compaction: table={} cf={} region={} compacted regions={}", 184 htd.getTableName(), hcd.getNameAsString(), ri.getRegionNameAsString(), 185 totalCompacted); 186 compacted.add(ri); 187 } 188 } catch (IOException e) { 189 LOG.error( 190 "Could not get compaction state for table={} cf={} region={}, compaction will" 191 + " aborted for the region.", 192 htd.getTableName(), hcd.getNameAsString(), ri.getEncodedName()); 193 LOG.error("Because of:", e); 194 failed.add(ri); 195 } 196 } 197 // Remove failed regions to avoid 198 // endless compaction loop 199 toCompact.removeAll(failed); 200 failed.clear(); 201 // Update batch: remove compacted regions and add new ones 202 for (RegionInfo ri : compacted) { 203 toCompact.remove(ri); 204 if (regions.size() > 0) { 205 RegionInfo region = regions.remove(0); 206 toCompact.add(region); 207 startCompaction(admin, htd.getTableName(), region, hcd.getName()); 208 } 209 } 210 compacted.clear(); 211 212 LOG.debug( 213 "Table={} cf={}. Wait for 10 sec, toCompact size={} regions left={}" 214 + " compacted so far={}", 215 htd.getTableName(), hcd.getNameAsString(), toCompact.size(), regions.size(), 216 totalCompacted); 217 Thread.sleep(10000); 218 } 219 LOG.info("Finished major MOB compacting table={}. cf={}", htd.getTableName(), 220 hcd.getNameAsString()); 221 222 } 223 224 private void startCompaction(Admin admin, TableName table, RegionInfo region, byte[] cf) 225 throws IOException, InterruptedException { 226 227 LOG.info("Started major compaction: table={} cf={} region={}", table, Bytes.toString(cf), 228 region.getRegionNameAsString()); 229 admin.majorCompactRegion(region.getRegionName(), cf); 230 // Wait until it really starts 231 // but with finite timeout 232 long waitTime = 300000; // 5 min 233 long startTime = EnvironmentEdgeManager.currentTime(); 234 while (admin.getCompactionStateForRegion(region.getRegionName()) == CompactionState.NONE) { 235 // Is 1 second too aggressive? 236 Thread.sleep(1000); 237 if (EnvironmentEdgeManager.currentTime() - startTime > waitTime) { 238 LOG.warn("Waited for {} ms to start major MOB compaction on table={} cf={} region={}." 239 + " Stopped waiting for request confirmation. This is not an ERROR, continue next region.", 240 waitTime, table.getNameAsString(), Bytes.toString(cf), region.getRegionNameAsString()); 241 break; 242 } 243 } 244 } 245}