001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.UUID;
028import java.util.concurrent.TimeUnit;
029import org.apache.commons.lang3.mutable.MutableBoolean;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.FileUtil;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.TableNameTestRule;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.RegionInfo;
041import org.apache.hadoop.hbase.client.RegionInfoBuilder;
042import org.apache.hadoop.hbase.client.Table;
043import org.apache.hadoop.hbase.client.TableDescriptor;
044import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
045import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
046import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerForTest;
047import org.apache.hadoop.hbase.testclassification.LargeTests;
048import org.apache.hadoop.hbase.testclassification.RegionServerTests;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.hadoop.hbase.util.Pair;
052import org.junit.AfterClass;
053import org.junit.Before;
054import org.junit.BeforeClass;
055import org.junit.ClassRule;
056import org.junit.Rule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059
060@Category({ RegionServerTests.class, LargeTests.class })
061public class TestMergesSplitsAddToTracker {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065    HBaseClassTestRule.forClass(TestMergesSplitsAddToTracker.class);
066
067  private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
068
069  private static final String FAMILY_NAME_STR = "info";
070
071  private static final byte[] FAMILY_NAME = Bytes.toBytes(FAMILY_NAME_STR);
072
073  @Rule
074  public TableNameTestRule name = new TableNameTestRule();
075
076  @BeforeClass
077  public static void setupClass() throws Exception {
078    TEST_UTIL.startMiniCluster();
079  }
080
081  @AfterClass
082  public static void afterClass() throws Exception {
083    TEST_UTIL.shutdownMiniCluster();
084  }
085
086  @Before
087  public void setup() {
088    StoreFileTrackerForTest.clear();
089  }
090
091  private TableName createTable(byte[] splitKey) throws IOException {
092    TableDescriptor td = TableDescriptorBuilder.newBuilder(name.getTableName())
093      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY_NAME))
094      .setValue(TRACKER_IMPL, StoreFileTrackerForTest.class.getName()).build();
095    if (splitKey != null) {
096      TEST_UTIL.getAdmin().createTable(td, new byte[][] { splitKey });
097    } else {
098      TEST_UTIL.getAdmin().createTable(td);
099    }
100    return td.getTableName();
101  }
102
103  @Test
104  public void testCommitDaughterRegion() throws Exception {
105    TableName table = createTable(null);
106    // first put some data in order to have a store file created
107    putThreeRowsAndFlush(table);
108    HRegion region = TEST_UTIL.getHBaseCluster().getRegions(table).get(0);
109    HRegionFileSystem regionFS = region.getStores().get(0).getRegionFileSystem();
110    RegionInfo daughterA =
111      RegionInfoBuilder.newBuilder(table).setStartKey(region.getRegionInfo().getStartKey())
112        .setEndKey(Bytes.toBytes("002")).setSplit(false)
113        .setRegionId(region.getRegionInfo().getRegionId() + EnvironmentEdgeManager.currentTime())
114        .build();
115    RegionInfo daughterB = RegionInfoBuilder.newBuilder(table).setStartKey(Bytes.toBytes("002"))
116      .setEndKey(region.getRegionInfo().getEndKey()).setSplit(false)
117      .setRegionId(region.getRegionInfo().getRegionId()).build();
118    HStoreFile file = (HStoreFile) region.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
119    List<Path> splitFilesA = new ArrayList<>();
120    splitFilesA.add(regionFS.splitStoreFile(daughterA, Bytes.toString(FAMILY_NAME), file,
121      Bytes.toBytes("002"), false, region.getSplitPolicy()));
122    List<Path> splitFilesB = new ArrayList<>();
123    splitFilesB.add(regionFS.splitStoreFile(daughterB, Bytes.toString(FAMILY_NAME), file,
124      Bytes.toBytes("002"), true, region.getSplitPolicy()));
125    MasterProcedureEnv env =
126      TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().getEnvironment();
127    Path resultA = regionFS.commitDaughterRegion(daughterA, splitFilesA, env);
128    Path resultB = regionFS.commitDaughterRegion(daughterB, splitFilesB, env);
129    FileSystem fs = regionFS.getFileSystem();
130    verifyFilesAreTracked(resultA, fs);
131    verifyFilesAreTracked(resultB, fs);
132  }
133
134  @Test
135  public void testCommitMergedRegion() throws Exception {
136    TableName table = createTable(null);
137    // splitting the table first
138    split(table, Bytes.toBytes("002"));
139    // Add data and flush to create files in the two different regions
140    putThreeRowsAndFlush(table);
141    List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(table);
142    HRegion first = regions.get(0);
143    HRegion second = regions.get(1);
144    HRegionFileSystem regionFS = first.getRegionFileSystem();
145
146    RegionInfo mergeResult =
147      RegionInfoBuilder.newBuilder(table).setStartKey(first.getRegionInfo().getStartKey())
148        .setEndKey(second.getRegionInfo().getEndKey()).setSplit(false)
149        .setRegionId(first.getRegionInfo().getRegionId() + EnvironmentEdgeManager.currentTime())
150        .build();
151
152    HRegionFileSystem mergeFS = HRegionFileSystem.createRegionOnFileSystem(
153      TEST_UTIL.getHBaseCluster().getMaster().getConfiguration(), regionFS.getFileSystem(),
154      regionFS.getTableDir(), mergeResult);
155
156    List<Path> mergedFiles = new ArrayList<>();
157    // merge file from first region
158    mergedFiles.add(mergeFileFromRegion(first, mergeFS));
159    // merge file from second region
160    mergedFiles.add(mergeFileFromRegion(second, mergeFS));
161    MasterProcedureEnv env =
162      TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().getEnvironment();
163    mergeFS.commitMergedRegion(mergedFiles, env);
164    // validate
165    FileSystem fs = first.getRegionFileSystem().getFileSystem();
166    Path finalMergeDir =
167      new Path(first.getRegionFileSystem().getTableDir(), mergeResult.getEncodedName());
168    verifyFilesAreTracked(finalMergeDir, fs);
169  }
170
171  @Test
172  public void testSplitLoadsFromTracker() throws Exception {
173    TableName table = createTable(null);
174    // Add data and flush to create files in the two different regions
175    putThreeRowsAndFlush(table);
176    HRegion region = TEST_UTIL.getHBaseCluster().getRegions(table).get(0);
177    Pair<StoreFileInfo, String> copyResult = copyFileInTheStoreDir(region);
178    StoreFileInfo fileInfo = copyResult.getFirst();
179    String copyName = copyResult.getSecond();
180    // Now splits the region
181    split(table, Bytes.toBytes("002"));
182    List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(table);
183    HRegion first = regions.get(0);
184    validateDaughterRegionsFiles(first, fileInfo.getActiveFileName(), copyName);
185    HRegion second = regions.get(1);
186    validateDaughterRegionsFiles(second, fileInfo.getActiveFileName(), copyName);
187  }
188
189  private void split(TableName table, byte[] splitKey) throws IOException {
190    TEST_UTIL.getAdmin().split(table, splitKey);
191    // wait until split is done
192    TEST_UTIL.waitFor(30000, () -> TEST_UTIL.getHBaseCluster().getRegions(table).size() == 2);
193  }
194
195  @Test
196  public void testMergeLoadsFromTracker() throws Exception {
197    TableName table = createTable(Bytes.toBytes("002"));
198    // Add data and flush to create files in the two different regions
199    putThreeRowsAndFlush(table);
200    List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(table);
201    HRegion first = regions.get(0);
202    Pair<StoreFileInfo, String> copyResult = copyFileInTheStoreDir(first);
203    StoreFileInfo fileInfo = copyResult.getFirst();
204    String copyName = copyResult.getSecond();
205    // Now merges the first two regions
206    TEST_UTIL.getAdmin()
207      .mergeRegionsAsync(new byte[][] { first.getRegionInfo().getEncodedNameAsBytes(),
208        regions.get(1).getRegionInfo().getEncodedNameAsBytes() }, true)
209      .get(10, TimeUnit.SECONDS);
210    regions = TEST_UTIL.getHBaseCluster().getRegions(table);
211    HRegion merged = regions.get(0);
212    validateDaughterRegionsFiles(merged, fileInfo.getActiveFileName(), copyName);
213  }
214
215  private Pair<StoreFileInfo, String> copyFileInTheStoreDir(HRegion region) throws IOException {
216    Path storeDir = region.getRegionFileSystem().getStoreDir("info");
217    // gets the single file
218    StoreFileInfo fileInfo = region.getRegionFileSystem().getStoreFiles("info").get(0);
219    // make a copy of the valid file staight into the store dir, so that it's not tracked.
220    String copyName = UUID.randomUUID().toString().replaceAll("-", "");
221    Path copy = new Path(storeDir, copyName);
222    FileUtil.copy(region.getFilesystem(), fileInfo.getFileStatus(), region.getFilesystem(), copy,
223      false, false, TEST_UTIL.getConfiguration());
224    return new Pair<>(fileInfo, copyName);
225  }
226
227  private void validateDaughterRegionsFiles(HRegion region, String orignalFileName,
228    String untrackedFile) throws IOException {
229    // verify there's no link for the untracked, copied file in first region
230    List<StoreFileInfo> infos = region.getRegionFileSystem().getStoreFiles("info");
231    final MutableBoolean foundLink = new MutableBoolean(false);
232    infos.stream().forEach(i -> {
233      if (i.getActiveFileName().contains(untrackedFile)) {
234        fail();
235      }
236      if (i.getActiveFileName().contains(orignalFileName)) {
237        foundLink.setTrue();
238      }
239    });
240    assertTrue(foundLink.booleanValue());
241  }
242
243  private void verifyFilesAreTracked(Path regionDir, FileSystem fs) throws Exception {
244    for (FileStatus f : fs.listStatus(new Path(regionDir, FAMILY_NAME_STR))) {
245      assertTrue(
246        StoreFileTrackerForTest.tracked(regionDir.getName(), FAMILY_NAME_STR, f.getPath()));
247    }
248  }
249
250  private Path mergeFileFromRegion(HRegion regionToMerge, HRegionFileSystem mergeFS)
251    throws IOException {
252    HStoreFile file = (HStoreFile) regionToMerge.getStore(FAMILY_NAME).getStorefiles().toArray()[0];
253    return mergeFS.mergeStoreFile(regionToMerge.getRegionInfo(), Bytes.toString(FAMILY_NAME), file);
254  }
255
256  private void putThreeRowsAndFlush(TableName table) throws IOException {
257    Table tbl = TEST_UTIL.getConnection().getTable(table);
258    Put put = new Put(Bytes.toBytes("001"));
259    byte[] qualifier = Bytes.toBytes("1");
260    put.addColumn(FAMILY_NAME, qualifier, Bytes.toBytes(1));
261    tbl.put(put);
262    put = new Put(Bytes.toBytes("002"));
263    put.addColumn(FAMILY_NAME, qualifier, Bytes.toBytes(2));
264    tbl.put(put);
265    put = new Put(Bytes.toBytes("003"));
266    put.addColumn(FAMILY_NAME, qualifier, Bytes.toBytes(2));
267    tbl.put(put);
268    TEST_UTIL.flush(table);
269  }
270}