001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.Random;
022import java.util.UUID;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.fs.FSDataOutputStream;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.CellBuilderType;
028import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.io.hfile.HFile;
033import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
034import org.apache.hadoop.hbase.testclassification.LargeTests;
035import org.apache.hadoop.hbase.testclassification.MiscTests;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hdfs.DistributedFileSystem;
038import org.apache.hadoop.hdfs.MiniDFSCluster;
039import org.junit.After;
040import org.junit.Assert;
041import org.junit.Before;
042import org.junit.ClassRule;
043import org.junit.Rule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.junit.rules.TemporaryFolder;
047import org.junit.rules.TestName;
048
049/**
050 * Tests for failedBulkLoad logic to make sure staged files are returned to their original location
051 * if the bulkload have failed.
052 */
053@Category({ MiscTests.class, LargeTests.class })
054public class TestSecureBulkloadListener {
055
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058    HBaseClassTestRule.forClass(TestSecureBulkloadListener.class);
059
060  @ClassRule
061  public static TemporaryFolder testFolder = new TemporaryFolder();
062  private Configuration conf;
063  private MiniDFSCluster cluster;
064  private HBaseTestingUtility htu;
065  private DistributedFileSystem dfs;
066  private final Random random = new Random();
067  private final byte[] randomBytes = new byte[100];
068  private static final String host1 = "host1";
069  private static final String host2 = "host2";
070  private static final String host3 = "host3";
071  private static byte[] FAMILY = Bytes.toBytes("family");
072  private static final String STAGING_DIR = "staging";
073  private static final String CUSTOM_STAGING_DIR = "customStaging";
074
075  @Rule
076  public TestName name = new TestName();
077
078  @Before
079  public void setUp() throws Exception {
080    random.nextBytes(randomBytes);
081    htu = new HBaseTestingUtility();
082    htu.getConfiguration().setInt("dfs.blocksize", 1024);// For the test with multiple blocks
083    htu.getConfiguration().setInt("dfs.replication", 3);
084    htu.startMiniDFSCluster(3, new String[] { "/r1", "/r2", "/r3" },
085      new String[] { host1, host2, host3 });
086
087    conf = htu.getConfiguration();
088    cluster = htu.getDFSCluster();
089    dfs = (DistributedFileSystem) FileSystem.get(conf);
090  }
091
092  @After
093  public void tearDownAfterClass() throws Exception {
094    htu.shutdownMiniCluster();
095  }
096
097  @Test
098  public void testMovingStagedFile() throws Exception {
099    Path stagingDirPath =
100      new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), STAGING_DIR));
101    if (!dfs.exists(stagingDirPath)) {
102      dfs.mkdirs(stagingDirPath);
103    }
104    SecureBulkLoadManager.SecureBulkLoadListener listener =
105      new SecureBulkLoadManager.SecureBulkLoadListener(dfs, stagingDirPath.toString(), conf);
106
107    // creating file to load
108    String srcFile = createHFileForFamilies(FAMILY);
109    Path srcPath = new Path(srcFile);
110    Assert.assertTrue(dfs.exists(srcPath));
111
112    Path stagedFamily = new Path(stagingDirPath, new Path(Bytes.toString(FAMILY)));
113    if (!dfs.exists(stagedFamily)) {
114      dfs.mkdirs(stagedFamily);
115    }
116
117    // moving file to staging
118    String stagedFile = listener.prepareBulkLoad(FAMILY, srcFile, false, null);
119    Path stagedPath = new Path(stagedFile);
120    Assert.assertTrue(dfs.exists(stagedPath));
121    Assert.assertFalse(dfs.exists(srcPath));
122
123    // moving files back to original location after a failed bulkload
124    listener.failedBulkLoad(FAMILY, stagedFile);
125    Assert.assertFalse(dfs.exists(stagedPath));
126    Assert.assertTrue(dfs.exists(srcPath));
127  }
128
129  @Test
130  public void testMovingStagedFileWithCustomStageDir() throws Exception {
131    Path stagingDirPath =
132      new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), STAGING_DIR));
133    if (!dfs.exists(stagingDirPath)) {
134      dfs.mkdirs(stagingDirPath);
135    }
136    SecureBulkLoadManager.SecureBulkLoadListener listener =
137      new SecureBulkLoadManager.SecureBulkLoadListener(dfs, stagingDirPath.toString(), conf);
138
139    // creating file to load
140    String srcFile = createHFileForFamilies(FAMILY);
141    Path srcPath = new Path(srcFile);
142    Assert.assertTrue(dfs.exists(srcPath));
143
144    Path stagedFamily = new Path(stagingDirPath, new Path(Bytes.toString(FAMILY)));
145    if (!dfs.exists(stagedFamily)) {
146      dfs.mkdirs(stagedFamily);
147    }
148
149    Path customStagingDirPath =
150      new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), CUSTOM_STAGING_DIR));
151    Path customStagedFamily = new Path(customStagingDirPath, new Path(Bytes.toString(FAMILY)));
152    if (!dfs.exists(customStagedFamily)) {
153      dfs.mkdirs(customStagedFamily);
154    }
155
156    // moving file to staging using a custom staging dir
157    String stagedFile =
158      listener.prepareBulkLoad(FAMILY, srcFile, false, customStagingDirPath.toString());
159    Path stagedPath = new Path(stagedFile);
160    Assert.assertTrue(dfs.exists(stagedPath));
161    Assert.assertFalse(dfs.exists(srcPath));
162
163    // moving files back to original location after a failed bulkload
164    listener.failedBulkLoad(FAMILY, stagedFile);
165    Assert.assertFalse(dfs.exists(stagedPath));
166    Assert.assertTrue(dfs.exists(srcPath));
167  }
168
169  @Test
170  public void testCopiedStagedFile() throws Exception {
171    Path stagingDirPath =
172      new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), STAGING_DIR));
173    if (!dfs.exists(stagingDirPath)) {
174      dfs.mkdirs(stagingDirPath);
175    }
176    SecureBulkLoadManager.SecureBulkLoadListener listener =
177      new SecureBulkLoadManager.SecureBulkLoadListener(dfs, stagingDirPath.toString(), conf);
178
179    // creating file to load
180    String srcFile = createHFileForFamilies(FAMILY);
181    Path srcPath = new Path(srcFile);
182    Assert.assertTrue(dfs.exists(srcPath));
183
184    Path stagedFamily = new Path(stagingDirPath, new Path(Bytes.toString(FAMILY)));
185    if (!dfs.exists(stagedFamily)) {
186      dfs.mkdirs(stagedFamily);
187    }
188
189    // copying file to staging
190    String stagedFile = listener.prepareBulkLoad(FAMILY, srcFile, true, null);
191    Path stagedPath = new Path(stagedFile);
192    Assert.assertTrue(dfs.exists(stagedPath));
193    Assert.assertTrue(dfs.exists(srcPath));
194
195    // should do nothing because the original file was copied to staging
196    listener.failedBulkLoad(FAMILY, stagedFile);
197    Assert.assertTrue(dfs.exists(stagedPath));
198    Assert.assertTrue(dfs.exists(srcPath));
199  }
200
201  @Test(expected = IOException.class)
202  public void testDeletedStagedFile() throws Exception {
203    Path stagingDirPath =
204      new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), STAGING_DIR));
205    if (!dfs.exists(stagingDirPath)) {
206      dfs.mkdirs(stagingDirPath);
207    }
208    SecureBulkLoadManager.SecureBulkLoadListener listener =
209      new SecureBulkLoadManager.SecureBulkLoadListener(dfs, stagingDirPath.toString(), conf);
210
211    // creating file to load
212    String srcFile = createHFileForFamilies(FAMILY);
213    Path srcPath = new Path(srcFile);
214    Assert.assertTrue(dfs.exists(srcPath));
215
216    Path stagedFamily = new Path(stagingDirPath, new Path(Bytes.toString(FAMILY)));
217    if (!dfs.exists(stagedFamily)) {
218      dfs.mkdirs(stagedFamily);
219    }
220
221    // moving file to staging
222    String stagedFile = listener.prepareBulkLoad(FAMILY, srcFile, false, null);
223    Path stagedPath = new Path(stagedFile);
224    Assert.assertTrue(dfs.exists(stagedPath));
225    Assert.assertFalse(dfs.exists(srcPath));
226
227    dfs.delete(stagedPath, false);
228
229    // moving files back to original location after a failed bulkload
230    listener.failedBulkLoad(FAMILY, stagedFile);
231  }
232
233  private String createHFileForFamilies(byte[] family) throws IOException {
234    HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(conf);
235    Path testDir =
236      new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), Bytes.toString(family)));
237    if (!dfs.exists(testDir)) {
238      dfs.mkdirs(testDir);
239    }
240    Path hfilePath = new Path(testDir, generateUniqueName(null));
241    FSDataOutputStream out = dfs.createFile(hfilePath).build();
242    try {
243      hFileFactory.withOutputStream(out);
244      hFileFactory.withFileContext(new HFileContextBuilder().build());
245      HFile.Writer writer = hFileFactory.create();
246      try {
247        writer.append(new KeyValue(ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
248          .setRow(randomBytes).setFamily(family).setQualifier(randomBytes).setTimestamp(0L)
249          .setType(KeyValue.Type.Put.getCode()).setValue(randomBytes).build()));
250      } finally {
251        writer.close();
252      }
253    } finally {
254      out.close();
255    }
256    return hfilePath.toString();
257  }
258
259  private static String generateUniqueName(final String suffix) {
260    String name = UUID.randomUUID().toString().replaceAll("-", "");
261    if (suffix != null) {
262      name += suffix;
263    }
264    return name;
265  }
266
267}