001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.TimeUnit; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.ScheduledChore; 028import org.apache.hadoop.hbase.TableDescriptors; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.Admin; 031import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 032import org.apache.hadoop.hbase.client.CompactionState; 033import org.apache.hadoop.hbase.client.RegionInfo; 034import org.apache.hadoop.hbase.client.TableDescriptor; 035import org.apache.hadoop.hbase.client.TableState; 036import org.apache.hadoop.hbase.master.HMaster; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * Periodic MOB compaction chore. 045 * <p/> 046 * It runs MOB compaction on region servers in parallel, thus utilizing distributed cluster 047 * resources. To avoid possible major compaction storms, one can specify maximum number regions to 048 * be compacted in parallel by setting configuration parameter: <br> 049 * 'hbase.mob.major.compaction.region.batch.size', which by default is 0 (unlimited). 050 */ 051@InterfaceAudience.Private 052public class MobFileCompactionChore extends ScheduledChore { 053 054 private static final Logger LOG = LoggerFactory.getLogger(MobFileCompactionChore.class); 055 private HMaster master; 056 private int regionBatchSize = 0;// not set - compact all 057 058 public MobFileCompactionChore(HMaster master) { 059 super(master.getServerName() + "-MobFileCompactionChore", master, 060 master.getConfiguration().getInt(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 061 MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD), 062 master.getConfiguration().getInt(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 063 MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD), 064 TimeUnit.SECONDS); 065 this.master = master; 066 this.regionBatchSize = 067 master.getConfiguration().getInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, 068 MobConstants.DEFAULT_MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE); 069 070 } 071 072 public MobFileCompactionChore(Configuration conf, int batchSize) { 073 this.regionBatchSize = batchSize; 074 } 075 076 @Override 077 protected void chore() { 078 boolean reported = false; 079 080 try (Admin admin = master.getConnection().getAdmin()) { 081 TableDescriptors htds = master.getTableDescriptors(); 082 Map<String, TableDescriptor> map = htds.getAll(); 083 for (TableDescriptor htd : map.values()) { 084 if ( 085 !master.getTableStateManager().isTableState(htd.getTableName(), TableState.State.ENABLED) 086 ) { 087 LOG.info("Skipping MOB compaction on table {} because it is not ENABLED", 088 htd.getTableName()); 089 continue; 090 } else { 091 LOG.info("Starting MOB compaction on table {}, checking {} column families", 092 htd.getTableName(), htd.getColumnFamilyCount()); 093 } 094 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 095 try { 096 if (hcd.isMobEnabled()) { 097 if (!reported) { 098 master.reportMobCompactionStart(htd.getTableName()); 099 reported = true; 100 } 101 LOG.info("Major MOB compacting table={} cf={}", htd.getTableName(), 102 hcd.getNameAsString()); 103 if (regionBatchSize == MobConstants.DEFAULT_MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE) { 104 LOG.debug( 105 "Table={} cf ={}: batch MOB compaction is disabled, {}=0 -" 106 + " all regions will be compacted in parallel", 107 htd.getTableName(), hcd.getNameAsString(), "hbase.mob.compaction.batch.size"); 108 admin.majorCompact(htd.getTableName(), hcd.getName()); 109 } else { 110 LOG.info( 111 "Table={} cf={}: performing MOB major compaction in batches " 112 + "'hbase.mob.compaction.batch.size'={}", 113 htd.getTableName(), hcd.getNameAsString(), regionBatchSize); 114 performMajorCompactionInBatches(admin, htd, hcd); 115 } 116 } else { 117 LOG.debug("Skipping table={} column family={} because it is not MOB-enabled", 118 htd.getTableName(), hcd.getNameAsString()); 119 } 120 } catch (IOException e) { 121 LOG.error("Failed to compact table={} cf={}", htd.getTableName(), hcd.getNameAsString(), 122 e); 123 } catch (InterruptedException ee) { 124 Thread.currentThread().interrupt(); 125 master.reportMobCompactionEnd(htd.getTableName()); 126 LOG.warn("Failed to compact table={} cf={}", htd.getTableName(), hcd.getNameAsString(), 127 ee); 128 // Quit the chore 129 return; 130 } 131 } 132 if (reported) { 133 master.reportMobCompactionEnd(htd.getTableName()); 134 reported = false; 135 } 136 } 137 } catch (IOException e) { 138 LOG.error("Failed to compact", e); 139 } 140 } 141 142 public void performMajorCompactionInBatches(Admin admin, TableDescriptor htd, 143 ColumnFamilyDescriptor hcd) throws IOException, InterruptedException { 144 145 List<RegionInfo> regions = admin.getRegions(htd.getTableName()); 146 if (regions.size() <= this.regionBatchSize) { 147 LOG.debug( 148 "Table={} cf={} - performing major MOB compaction in non-batched mode," 149 + "regions={}, batch size={}", 150 htd.getTableName(), hcd.getNameAsString(), regions.size(), regionBatchSize); 151 admin.majorCompact(htd.getTableName(), hcd.getName()); 152 return; 153 } 154 // Shuffle list of regions in case if they come ordered by region server 155 Collections.shuffle(regions); 156 // Create first batch 157 List<RegionInfo> toCompact = new ArrayList<RegionInfo>(this.regionBatchSize); 158 for (int i = 0; i < this.regionBatchSize; i++) { 159 toCompact.add(regions.remove(0)); 160 } 161 162 // Start compaction now 163 for (RegionInfo ri : toCompact) { 164 startCompaction(admin, htd.getTableName(), ri, hcd.getName()); 165 } 166 167 List<RegionInfo> compacted = new ArrayList<RegionInfo>(toCompact.size()); 168 List<RegionInfo> failed = new ArrayList<RegionInfo>(); 169 int totalCompacted = 0; 170 while (!toCompact.isEmpty()) { 171 // Check status of active compactions 172 for (RegionInfo ri : toCompact) { 173 try { 174 if (admin.getCompactionStateForRegion(ri.getRegionName()) == CompactionState.NONE) { 175 totalCompacted++; 176 LOG.info("Finished major MOB compaction: table={} cf={} region={} compacted regions={}", 177 htd.getTableName(), hcd.getNameAsString(), ri.getRegionNameAsString(), 178 totalCompacted); 179 compacted.add(ri); 180 } 181 } catch (IOException e) { 182 LOG.error( 183 "Could not get compaction state for table={} cf={} region={}, compaction will" 184 + " aborted for the region.", 185 htd.getTableName(), hcd.getNameAsString(), ri.getEncodedName()); 186 LOG.error("Because of:", e); 187 failed.add(ri); 188 } 189 } 190 // Remove failed regions to avoid 191 // endless compaction loop 192 toCompact.removeAll(failed); 193 failed.clear(); 194 // Update batch: remove compacted regions and add new ones 195 for (RegionInfo ri : compacted) { 196 toCompact.remove(ri); 197 if (regions.size() > 0) { 198 RegionInfo region = regions.remove(0); 199 toCompact.add(region); 200 startCompaction(admin, htd.getTableName(), region, hcd.getName()); 201 } 202 } 203 compacted.clear(); 204 205 LOG.debug( 206 "Table={} cf={}. Wait for 10 sec, toCompact size={} regions left={}" 207 + " compacted so far={}", 208 htd.getTableName(), hcd.getNameAsString(), toCompact.size(), regions.size(), 209 totalCompacted); 210 Thread.sleep(10000); 211 } 212 LOG.info("Finished major MOB compacting table={}. cf={}", htd.getTableName(), 213 hcd.getNameAsString()); 214 215 } 216 217 private void startCompaction(Admin admin, TableName table, RegionInfo region, byte[] cf) 218 throws IOException, InterruptedException { 219 LOG.info("Started major compaction: table={} cf={} region={}", table, Bytes.toString(cf), 220 region.getRegionNameAsString()); 221 admin.majorCompactRegion(region.getRegionName(), cf); 222 // Wait until it really starts 223 // but with finite timeout 224 long waitTime = 300000; // 5 min 225 long startTime = EnvironmentEdgeManager.currentTime(); 226 while (admin.getCompactionStateForRegion(region.getRegionName()) == CompactionState.NONE) { 227 // Is 1 second too aggressive? 228 Thread.sleep(1000); 229 if (EnvironmentEdgeManager.currentTime() - startTime > waitTime) { 230 LOG.warn( 231 "Waited for {} ms to start major MOB compaction on table={} cf={} region={}." 232 + " Stopped waiting for request confirmation. This is not an ERROR," 233 + " continue next region.", 234 waitTime, table.getNameAsString(), Bytes.toString(cf), region.getRegionNameAsString()); 235 break; 236 } 237 } 238 } 239}