001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY;
021import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024import static org.junit.jupiter.api.Assertions.fail;
025
026import java.io.IOException;
027import java.util.List;
028import java.util.Optional;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.ExtendedCell;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.Durability;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.RegionInfo;
041import org.apache.hadoop.hbase.client.RegionInfoBuilder;
042import org.apache.hadoop.hbase.client.Table;
043import org.apache.hadoop.hbase.client.TableDescriptor;
044import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
045import org.apache.hadoop.hbase.regionserver.CellSink;
046import org.apache.hadoop.hbase.regionserver.HMobStore;
047import org.apache.hadoop.hbase.regionserver.HRegion;
048import org.apache.hadoop.hbase.regionserver.HStore;
049import org.apache.hadoop.hbase.regionserver.InternalScanner;
050import org.apache.hadoop.hbase.regionserver.RegionAsTable;
051import org.apache.hadoop.hbase.regionserver.ScannerContext;
052import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
053import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
054import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
055import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
056import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
057import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
058import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
059import org.apache.hadoop.hbase.security.User;
060import org.apache.hadoop.hbase.testclassification.MediumTests;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.junit.jupiter.api.AfterEach;
063import org.junit.jupiter.api.BeforeAll;
064import org.junit.jupiter.api.BeforeEach;
065import org.junit.jupiter.api.Tag;
066import org.junit.jupiter.api.Test;
067import org.junit.jupiter.api.TestInfo;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071@Tag(MediumTests.TAG)
072public class TestMobCompactionWithException {
073  static final Logger LOG = LoggerFactory.getLogger(TestMobCompactionWithException.class.getName());
074  private final static HBaseTestingUtil HTU = new HBaseTestingUtil();
075  private static Configuration conf = null;
076  private String testMethodName;
077
078  private HRegion region = null;
079  private TableDescriptor tableDescriptor;
080  private ColumnFamilyDescriptor columnFamilyDescriptor;
081  private FileSystem fs;
082
083  private static final byte[] COLUMN_FAMILY = fam1;
084  private final byte[] STARTROW = Bytes.toBytes(START_KEY);
085  private static volatile boolean testException = false;
086  private static int rowCount = 100;
087  private Table table;
088
089  @BeforeAll
090  public static void setUp() throws Exception {
091    conf = HTU.getConfiguration();
092    conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
093    conf.set(MobStoreEngine.MOB_COMPACTOR_CLASS_KEY, MyMobStoreCompactor.class.getName());
094
095  }
096
097  @BeforeEach
098  public void setUp(TestInfo testInfo) {
099    testMethodName = testInfo.getTestMethod().get().getName();
100  }
101
102  @AfterEach
103  public void tearDown() throws Exception {
104    region.close();
105    this.table.close();
106    fs.delete(HTU.getDataTestDir(), true);
107  }
108
109  private void createTable(long mobThreshold) throws IOException {
110
111    this.columnFamilyDescriptor =
112      ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_FAMILY).setMobEnabled(true)
113        .setMobThreshold(mobThreshold).setMaxVersions(1).setBlocksize(500).build();
114    this.tableDescriptor = TableDescriptorBuilder
115      .newBuilder(TableName.valueOf(TestMobUtils.getTableName(testMethodName)))
116      .setColumnFamily(columnFamilyDescriptor).build();
117    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build();
118    region = HBaseTestingUtil.createRegionAndWAL(regionInfo, HTU.getDataTestDir(), conf,
119      tableDescriptor, new MobFileCache(conf));
120    this.table = new RegionAsTable(region);
121    fs = FileSystem.get(conf);
122  }
123
124  /**
125   * This test is for HBASE-27433.
126   */
127  @Test
128  public void testMobStoreFileDeletedWhenCompactException() throws Exception {
129    this.createTable(200);
130    byte[] dummyData = makeDummyData(1000); // larger than mob threshold
131    for (int i = 0; i < rowCount; i++) {
132      Put p = createPut(i, dummyData);
133      table.put(p);
134      region.flush(true);
135    }
136
137    int storeFileCountBeforeCompact = countStoreFiles();
138    int mobFileCountBeforeCompact = countMobFiles();
139    long mobFileByteSize = getMobFileByteSize();
140
141    List<HStore> stores = region.getStores();
142    assertTrue(stores.size() == 1);
143    HMobStore mobStore = (HMobStore) stores.get(0);
144    Compactor<?> compactor = mobStore.getStoreEngine().getCompactor();
145    MyMobStoreCompactor myMobStoreCompactor = (MyMobStoreCompactor) compactor;
146    myMobStoreCompactor.setMobFileMaxByteSize(mobFileByteSize + 100);
147    testException = true;
148    try {
149      try {
150
151        // Force major compaction
152        mobStore.triggerMajorCompaction();
153        Optional<CompactionContext> context = mobStore.requestCompaction(HStore.PRIORITY_USER,
154          CompactionLifeCycleTracker.DUMMY, User.getCurrent());
155        assertTrue(context.isPresent());
156        region.compact(context.get(), mobStore, NoLimitThroughputController.INSTANCE,
157          User.getCurrent());
158
159        fail();
160      } catch (IOException e) {
161        assertTrue(e != null);
162      }
163    } finally {
164      testException = false;
165    }
166
167    // When compaction is failed,the count of StoreFile and MobStoreFile should be the same as
168    // before compaction.
169    assertEquals(storeFileCountBeforeCompact, countStoreFiles(), "After compaction: store files");
170    assertEquals(mobFileCountBeforeCompact, countMobFiles(), "After compaction: mob file count");
171  }
172
173  private int countStoreFiles() throws IOException {
174    HStore store = region.getStore(COLUMN_FAMILY);
175    return store.getStorefilesCount();
176  }
177
178  private int countMobFiles() throws IOException {
179    Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableDescriptor.getTableName(),
180      columnFamilyDescriptor.getNameAsString());
181    if (fs.exists(mobDirPath)) {
182      FileStatus[] files = HTU.getTestFileSystem().listStatus(mobDirPath);
183      return files.length;
184    }
185    return 0;
186  }
187
188  private long getMobFileByteSize() throws IOException {
189    Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableDescriptor.getTableName(),
190      columnFamilyDescriptor.getNameAsString());
191    if (fs.exists(mobDirPath)) {
192      FileStatus[] files = HTU.getTestFileSystem().listStatus(mobDirPath);
193      if (files.length > 0) {
194        return files[0].getLen();
195      }
196    }
197    return 0;
198  }
199
200  private Put createPut(int rowIdx, byte[] dummyData) throws IOException {
201    Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)));
202    p.setDurability(Durability.SKIP_WAL);
203    p.addColumn(COLUMN_FAMILY, Bytes.toBytes("colX"), dummyData);
204    return p;
205  }
206
207  private byte[] makeDummyData(int size) {
208    byte[] dummyData = new byte[size];
209    Bytes.random(dummyData);
210    return dummyData;
211  }
212
213  public static class MyMobStoreCompactor extends DefaultMobStoreCompactor {
214    public MyMobStoreCompactor(Configuration conf, HStore store) {
215      super(conf, store);
216
217    }
218
219    public void setMobFileMaxByteSize(long maxByteSize) {
220      this.conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, maxByteSize);
221    }
222
223    @Override
224    protected boolean performCompaction(FileDetails fd, final InternalScanner scanner,
225      CellSink writer, long smallestReadPoint, boolean cleanSeqId,
226      ThroughputController throughputController, CompactionRequestImpl request,
227      CompactionProgress progress) throws IOException {
228
229      InternalScanner wrappedScanner = new InternalScanner() {
230
231        private int count = -1;
232
233        @Override
234        public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext)
235          throws IOException {
236          count++;
237          if (count == rowCount - 1 && testException) {
238            count = 0;
239            throw new IOException("Inject Error");
240          }
241          return scanner.next(result, scannerContext);
242        }
243
244        @Override
245        public void close() throws IOException {
246          scanner.close();
247        }
248      };
249      return super.performCompaction(fd, wrappedScanner, writer, smallestReadPoint, cleanSeqId,
250        throughputController, request, progress);
251    }
252  }
253}