001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.containsString;
022import static org.hamcrest.Matchers.equalTo;
023import static org.hamcrest.Matchers.greaterThan;
024import static org.hamcrest.Matchers.notNullValue;
025import static org.hamcrest.Matchers.nullValue;
026import static org.junit.Assert.assertThrows;
027
028import java.io.IOException;
029import java.util.function.Function;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.LocatedFileStatus;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.fs.RemoteIterator;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseIOException;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.client.TableDescriptor;
042import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
043import org.apache.hadoop.hbase.io.hfile.HFile;
044import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
045import org.apache.hadoop.hbase.regionserver.HRegion;
046import org.apache.hadoop.hbase.testclassification.MasterTests;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.CommonFSUtils;
050import org.apache.hadoop.hbase.util.JVMClusterUtil;
051import org.apache.hadoop.hbase.util.TableDescriptorChecker;
052import org.apache.hadoop.hdfs.DistributedFileSystem;
053import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
054import org.junit.AfterClass;
055import org.junit.BeforeClass;
056import org.junit.ClassRule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062@Category({ MasterTests.class, MediumTests.class })
063public class TestManageTableErasureCodingPolicy {
064
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067    HBaseClassTestRule.forClass(TestManageTableErasureCodingPolicy.class);
068  private static final Logger LOG =
069    LoggerFactory.getLogger(TestManageTableErasureCodingPolicy.class);
070
071  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
072  private static final byte[] FAMILY = Bytes.toBytes("a");
073  private static final TableName NON_EC_TABLE = TableName.valueOf("foo");
074  private static final TableDescriptor NON_EC_TABLE_DESC = TableDescriptorBuilder
075    .newBuilder(NON_EC_TABLE).setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
076  private static final TableName EC_TABLE = TableName.valueOf("bar");
077  private static final TableDescriptor EC_TABLE_DESC =
078    TableDescriptorBuilder.newBuilder(EC_TABLE).setErasureCodingPolicy("XOR-2-1-1024k")
079      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
080
081  @BeforeClass
082  public static void beforeClass() throws Exception {
083    // enable because we are testing the checks below
084    UTIL.getConfiguration().setBoolean(TableDescriptorChecker.TABLE_SANITY_CHECKS, true);
085    UTIL.startMiniDFSCluster(3); // 3 necessary for XOR-2-1-1024k
086    UTIL.startMiniCluster(1);
087    DistributedFileSystem fs = (DistributedFileSystem) FileSystem.get(UTIL.getConfiguration());
088    fs.enableErasureCodingPolicy("XOR-2-1-1024k");
089    fs.enableErasureCodingPolicy("RS-6-3-1024k");
090    Table table = UTIL.createTable(NON_EC_TABLE_DESC, null);
091    UTIL.loadTable(table, FAMILY);
092    UTIL.flush();
093  }
094
095  @AfterClass
096  public static void afterClass() throws Exception {
097    UTIL.shutdownMiniCluster();
098    UTIL.shutdownMiniDFSCluster();
099  }
100
101  @Test
102  public void itValidatesPolicyNameForCreate() {
103    runValidatePolicyNameTest(unused -> EC_TABLE_DESC, Admin::createTable);
104  }
105
106  @Test
107  public void itValidatesPolicyNameForAlter() {
108    runValidatePolicyNameTest(admin -> {
109      try {
110        return admin.getDescriptor(NON_EC_TABLE);
111      } catch (IOException e) {
112        throw new RuntimeException(e);
113      }
114    }, Admin::modifyTable);
115  }
116
117  @FunctionalInterface
118  interface ThrowingTableDescriptorConsumer {
119    void accept(Admin admin, TableDescriptor desc) throws IOException;
120  }
121
122  private void runValidatePolicyNameTest(Function<Admin, TableDescriptor> descriptorSupplier,
123    ThrowingTableDescriptorConsumer consumer) {
124    HBaseIOException thrown = assertThrows(HBaseIOException.class, () -> {
125      try (Admin admin = UTIL.getAdmin()) {
126        TableDescriptor desc = descriptorSupplier.apply(admin);
127        consumer.accept(admin,
128          TableDescriptorBuilder.newBuilder(desc).setErasureCodingPolicy("foo").build());
129      }
130    });
131    assertThat(thrown.getMessage(),
132      containsString("Cannot set Erasure Coding policy: foo. Policy not found"));
133
134    thrown = assertThrows(HBaseIOException.class, () -> {
135      try (Admin admin = UTIL.getAdmin()) {
136        TableDescriptor desc = descriptorSupplier.apply(admin);
137        consumer.accept(admin,
138          TableDescriptorBuilder.newBuilder(desc).setErasureCodingPolicy("RS-10-4-1024k").build());
139      }
140    });
141    assertThat(thrown.getMessage(), containsString(
142      "Cannot set Erasure Coding policy: RS-10-4-1024k. The policy must be enabled"));
143
144    // RS-6-3-1024k requires at least 6 datanodes, so should fail write test
145    thrown = assertThrows(HBaseIOException.class, () -> {
146      try (Admin admin = UTIL.getAdmin()) {
147        TableDescriptor desc = descriptorSupplier.apply(admin);
148        consumer.accept(admin,
149          TableDescriptorBuilder.newBuilder(desc).setErasureCodingPolicy("RS-6-3-1024k").build());
150      }
151    });
152    assertThat(thrown.getMessage(), containsString("Failed write test for EC policy"));
153  }
154
155  @Test
156  public void testCreateTableErasureCodingSync() throws IOException {
157    try (Admin admin = UTIL.getAdmin()) {
158      recreateTable(admin, EC_TABLE_DESC);
159      UTIL.flush(EC_TABLE);
160      Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
161      DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(UTIL.getConfiguration());
162      checkRegionDirAndFilePolicies(dfs, rootDir, EC_TABLE, "XOR-2-1-1024k", "XOR-2-1-1024k");
163    }
164  }
165
166  private void recreateTable(Admin admin, TableDescriptor desc) throws IOException {
167    if (admin.tableExists(desc.getTableName())) {
168      admin.disableTable(desc.getTableName());
169      admin.deleteTable(desc.getTableName());
170    }
171    admin.createTable(desc);
172    try (Table table = UTIL.getConnection().getTable(desc.getTableName())) {
173      UTIL.loadTable(table, FAMILY);
174    }
175  }
176
177  @Test
178  public void testModifyTableErasureCodingSync() throws IOException, InterruptedException {
179    try (Admin admin = UTIL.getAdmin()) {
180      Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
181      DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(UTIL.getConfiguration());
182
183      // start off without EC
184      checkRegionDirAndFilePolicies(dfs, rootDir, NON_EC_TABLE, null, null);
185
186      // add EC
187      TableDescriptor desc = UTIL.getAdmin().getDescriptor(NON_EC_TABLE);
188      TableDescriptor newDesc =
189        TableDescriptorBuilder.newBuilder(desc).setErasureCodingPolicy("XOR-2-1-1024k").build();
190      admin.modifyTable(newDesc);
191
192      // check dirs, but files should not be changed yet
193      checkRegionDirAndFilePolicies(dfs, rootDir, NON_EC_TABLE, "XOR-2-1-1024k", null);
194
195      compactAwayOldFiles(NON_EC_TABLE);
196
197      // expect both dirs and files to be EC now
198      checkRegionDirAndFilePolicies(dfs, rootDir, NON_EC_TABLE, "XOR-2-1-1024k", "XOR-2-1-1024k");
199
200      newDesc = TableDescriptorBuilder.newBuilder(newDesc).setErasureCodingPolicy(null).build();
201      // remove EC now
202      admin.modifyTable(newDesc);
203
204      // dirs should no longer be EC, but old EC files remain
205      checkRegionDirAndFilePolicies(dfs, rootDir, NON_EC_TABLE, null, "XOR-2-1-1024k");
206
207      // compact to rewrite EC files without EC, then run discharger to get rid of the old EC files
208      UTIL.compact(NON_EC_TABLE, true);
209      for (JVMClusterUtil.RegionServerThread regionserver : UTIL.getHBaseCluster()
210        .getLiveRegionServerThreads()) {
211        CompactedHFilesDischarger chore =
212          regionserver.getRegionServer().getCompactedHFilesDischarger();
213        chore.setUseExecutor(false);
214        chore.chore();
215      }
216
217      checkRegionDirAndFilePolicies(dfs, rootDir, NON_EC_TABLE, null, null);
218    }
219  }
220
221  private void compactAwayOldFiles(TableName tableName) throws IOException {
222    LOG.info("Compacting and discharging files for {}", tableName);
223    // compact to rewrit files, then run discharger to get rid of the old files
224    UTIL.compact(tableName, true);
225    for (JVMClusterUtil.RegionServerThread regionserver : UTIL.getHBaseCluster()
226      .getLiveRegionServerThreads()) {
227      CompactedHFilesDischarger chore =
228        regionserver.getRegionServer().getCompactedHFilesDischarger();
229      chore.setUseExecutor(false);
230      chore.chore();
231    }
232  }
233
234  @Test
235  public void testRestoreSnapshot() throws IOException {
236    String snapshotName = "testRestoreSnapshot_snap";
237    TableName tableName = TableName.valueOf("testRestoreSnapshot_tbl");
238    try (Admin admin = UTIL.getAdmin()) {
239      Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
240      DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(UTIL.getConfiguration());
241
242      // recreate EC test table and load it
243      recreateTable(admin, EC_TABLE_DESC);
244
245      // Take a snapshot, then clone it into a new table
246      admin.snapshot(snapshotName, EC_TABLE);
247      admin.cloneSnapshot(snapshotName, tableName);
248      compactAwayOldFiles(tableName);
249
250      // Verify the new table has the right EC policy
251      checkRegionDirAndFilePolicies(dfs, rootDir, tableName, "XOR-2-1-1024k", "XOR-2-1-1024k");
252
253      // Remove the EC policy from the EC test table, and verify that worked
254      admin.modifyTable(
255        TableDescriptorBuilder.newBuilder(EC_TABLE_DESC).setErasureCodingPolicy(null).build());
256      compactAwayOldFiles(EC_TABLE);
257      checkRegionDirAndFilePolicies(dfs, rootDir, EC_TABLE, null, null);
258
259      // Restore snapshot, and then verify it has the policy again
260      admin.disableTable(EC_TABLE);
261      admin.restoreSnapshot(snapshotName);
262      admin.enableTable(EC_TABLE);
263      compactAwayOldFiles(EC_TABLE);
264      checkRegionDirAndFilePolicies(dfs, rootDir, EC_TABLE, "XOR-2-1-1024k", "XOR-2-1-1024k");
265    }
266  }
267
268  private void checkRegionDirAndFilePolicies(DistributedFileSystem dfs, Path rootDir,
269    TableName testTable, String expectedDirPolicy, String expectedFilePolicy) throws IOException {
270    Path tableDir = CommonFSUtils.getTableDir(rootDir, testTable);
271    checkPolicy(dfs, tableDir, expectedDirPolicy);
272
273    int filesMatched = 0;
274    for (HRegion region : UTIL.getHBaseCluster().getRegions(testTable)) {
275      Path regionDir = new Path(tableDir, region.getRegionInfo().getEncodedName());
276      checkPolicy(dfs, regionDir, expectedDirPolicy);
277      RemoteIterator<LocatedFileStatus> itr = dfs.listFiles(regionDir, true);
278      while (itr.hasNext()) {
279        LocatedFileStatus fileStatus = itr.next();
280        Path path = fileStatus.getPath();
281        if (!HFile.isHFileFormat(dfs, path)) {
282          LOG.info("{} is not an hfile", path);
283          continue;
284        }
285        filesMatched++;
286        checkPolicy(dfs, path, expectedFilePolicy);
287      }
288    }
289    assertThat(filesMatched, greaterThan(0));
290  }
291
292  private void checkPolicy(DistributedFileSystem dfs, Path path, String expectedPolicy)
293    throws IOException {
294    ErasureCodingPolicy policy = dfs.getErasureCodingPolicy(path);
295    if (expectedPolicy == null) {
296      assertThat("policy for " + path, policy, nullValue());
297    } else {
298      assertThat("policy for " + path, policy, notNullValue());
299      assertThat("policy for " + path, policy.getName(), equalTo(expectedPolicy));
300    }
301  }
302}