001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.UUID;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.fs.FSDataOutputStream;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.CellBuilderType;
027import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.KeyValue;
031import org.apache.hadoop.hbase.io.hfile.HFile;
032import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
033import org.apache.hadoop.hbase.testclassification.LargeTests;
034import org.apache.hadoop.hbase.testclassification.MiscTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hdfs.DistributedFileSystem;
037import org.apache.hadoop.hdfs.MiniDFSCluster;
038import org.junit.After;
039import org.junit.Assert;
040import org.junit.Before;
041import org.junit.ClassRule;
042import org.junit.Rule;
043import org.junit.Test;
044import org.junit.experimental.categories.Category;
045import org.junit.rules.TemporaryFolder;
046import org.junit.rules.TestName;
047
048/**
049 * Tests for failedBulkLoad logic to make sure staged files are returned to their original location
050 * if the bulkload have failed.
051 */
052@Category({ MiscTests.class, LargeTests.class })
053public class TestSecureBulkloadListener {
054
055  @ClassRule
056  public static final HBaseClassTestRule CLASS_RULE =
057    HBaseClassTestRule.forClass(TestSecureBulkloadListener.class);
058
059  @ClassRule
060  public static TemporaryFolder testFolder = new TemporaryFolder();
061  private Configuration conf;
062  private MiniDFSCluster cluster;
063  private HBaseTestingUtil htu;
064  private DistributedFileSystem dfs;
065  private final byte[] randomBytes = new byte[100];
066  private static final String host1 = "host1";
067  private static final String host2 = "host2";
068  private static final String host3 = "host3";
069  private static byte[] FAMILY = Bytes.toBytes("family");
070  private static final String STAGING_DIR = "staging";
071  private static final String CUSTOM_STAGING_DIR = "customStaging";
072
073  @Rule
074  public TestName name = new TestName();
075
076  @Before
077  public void setUp() throws Exception {
078    Bytes.random(randomBytes);
079    htu = new HBaseTestingUtil();
080    htu.getConfiguration().setInt("dfs.blocksize", 1024);// For the test with multiple blocks
081    htu.getConfiguration().setInt("dfs.replication", 3);
082    htu.startMiniDFSCluster(3, new String[] { "/r1", "/r2", "/r3" },
083      new String[] { host1, host2, host3 });
084
085    conf = htu.getConfiguration();
086    cluster = htu.getDFSCluster();
087    dfs = (DistributedFileSystem) FileSystem.get(conf);
088  }
089
090  @After
091  public void tearDownAfterClass() throws Exception {
092    htu.shutdownMiniCluster();
093  }
094
095  @Test
096  public void testMovingStagedFile() throws Exception {
097    Path stagingDirPath =
098      new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), STAGING_DIR));
099    if (!dfs.exists(stagingDirPath)) {
100      dfs.mkdirs(stagingDirPath);
101    }
102    SecureBulkLoadManager.SecureBulkLoadListener listener =
103      new SecureBulkLoadManager.SecureBulkLoadListener(dfs, stagingDirPath.toString(), conf);
104
105    // creating file to load
106    String srcFile = createHFileForFamilies(FAMILY);
107    Path srcPath = new Path(srcFile);
108    Assert.assertTrue(dfs.exists(srcPath));
109
110    Path stagedFamily = new Path(stagingDirPath, new Path(Bytes.toString(FAMILY)));
111    if (!dfs.exists(stagedFamily)) {
112      dfs.mkdirs(stagedFamily);
113    }
114
115    // moving file to staging
116    String stagedFile = listener.prepareBulkLoad(FAMILY, srcFile, false, null);
117    Path stagedPath = new Path(stagedFile);
118    Assert.assertTrue(dfs.exists(stagedPath));
119    Assert.assertFalse(dfs.exists(srcPath));
120
121    // moving files back to original location after a failed bulkload
122    listener.failedBulkLoad(FAMILY, stagedFile);
123    Assert.assertFalse(dfs.exists(stagedPath));
124    Assert.assertTrue(dfs.exists(srcPath));
125  }
126
127  @Test
128  public void testMovingStagedFileWithCustomStageDir() throws Exception {
129    Path stagingDirPath =
130      new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), STAGING_DIR));
131    if (!dfs.exists(stagingDirPath)) {
132      dfs.mkdirs(stagingDirPath);
133    }
134    SecureBulkLoadManager.SecureBulkLoadListener listener =
135      new SecureBulkLoadManager.SecureBulkLoadListener(dfs, stagingDirPath.toString(), conf);
136
137    // creating file to load
138    String srcFile = createHFileForFamilies(FAMILY);
139    Path srcPath = new Path(srcFile);
140    Assert.assertTrue(dfs.exists(srcPath));
141
142    Path stagedFamily = new Path(stagingDirPath, new Path(Bytes.toString(FAMILY)));
143    if (!dfs.exists(stagedFamily)) {
144      dfs.mkdirs(stagedFamily);
145    }
146
147    Path customStagingDirPath =
148      new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), CUSTOM_STAGING_DIR));
149    Path customStagedFamily = new Path(customStagingDirPath, new Path(Bytes.toString(FAMILY)));
150    if (!dfs.exists(customStagedFamily)) {
151      dfs.mkdirs(customStagedFamily);
152    }
153
154    // moving file to staging using a custom staging dir
155    String stagedFile =
156      listener.prepareBulkLoad(FAMILY, srcFile, false, customStagingDirPath.toString());
157    Path stagedPath = new Path(stagedFile);
158    Assert.assertTrue(dfs.exists(stagedPath));
159    Assert.assertFalse(dfs.exists(srcPath));
160
161    // moving files back to original location after a failed bulkload
162    listener.failedBulkLoad(FAMILY, stagedFile);
163    Assert.assertFalse(dfs.exists(stagedPath));
164    Assert.assertTrue(dfs.exists(srcPath));
165  }
166
167  @Test
168  public void testCopiedStagedFile() throws Exception {
169    Path stagingDirPath =
170      new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), STAGING_DIR));
171    if (!dfs.exists(stagingDirPath)) {
172      dfs.mkdirs(stagingDirPath);
173    }
174    SecureBulkLoadManager.SecureBulkLoadListener listener =
175      new SecureBulkLoadManager.SecureBulkLoadListener(dfs, stagingDirPath.toString(), conf);
176
177    // creating file to load
178    String srcFile = createHFileForFamilies(FAMILY);
179    Path srcPath = new Path(srcFile);
180    Assert.assertTrue(dfs.exists(srcPath));
181
182    Path stagedFamily = new Path(stagingDirPath, new Path(Bytes.toString(FAMILY)));
183    if (!dfs.exists(stagedFamily)) {
184      dfs.mkdirs(stagedFamily);
185    }
186
187    // copying file to staging
188    String stagedFile = listener.prepareBulkLoad(FAMILY, srcFile, true, null);
189    Path stagedPath = new Path(stagedFile);
190    Assert.assertTrue(dfs.exists(stagedPath));
191    Assert.assertTrue(dfs.exists(srcPath));
192
193    // should do nothing because the original file was copied to staging
194    listener.failedBulkLoad(FAMILY, stagedFile);
195    Assert.assertTrue(dfs.exists(stagedPath));
196    Assert.assertTrue(dfs.exists(srcPath));
197  }
198
199  @Test(expected = IOException.class)
200  public void testDeletedStagedFile() throws Exception {
201    Path stagingDirPath =
202      new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), STAGING_DIR));
203    if (!dfs.exists(stagingDirPath)) {
204      dfs.mkdirs(stagingDirPath);
205    }
206    SecureBulkLoadManager.SecureBulkLoadListener listener =
207      new SecureBulkLoadManager.SecureBulkLoadListener(dfs, stagingDirPath.toString(), conf);
208
209    // creating file to load
210    String srcFile = createHFileForFamilies(FAMILY);
211    Path srcPath = new Path(srcFile);
212    Assert.assertTrue(dfs.exists(srcPath));
213
214    Path stagedFamily = new Path(stagingDirPath, new Path(Bytes.toString(FAMILY)));
215    if (!dfs.exists(stagedFamily)) {
216      dfs.mkdirs(stagedFamily);
217    }
218
219    // moving file to staging
220    String stagedFile = listener.prepareBulkLoad(FAMILY, srcFile, false, null);
221    Path stagedPath = new Path(stagedFile);
222    Assert.assertTrue(dfs.exists(stagedPath));
223    Assert.assertFalse(dfs.exists(srcPath));
224
225    dfs.delete(stagedPath, false);
226
227    // moving files back to original location after a failed bulkload
228    listener.failedBulkLoad(FAMILY, stagedFile);
229  }
230
231  private String createHFileForFamilies(byte[] family) throws IOException {
232    HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(conf);
233    Path testDir =
234      new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), Bytes.toString(family)));
235    if (!dfs.exists(testDir)) {
236      dfs.mkdirs(testDir);
237    }
238    Path hfilePath = new Path(testDir, generateUniqueName(null));
239    FSDataOutputStream out = dfs.createFile(hfilePath).build();
240    try {
241      hFileFactory.withOutputStream(out);
242      hFileFactory.withFileContext(new HFileContextBuilder().build());
243      HFile.Writer writer = hFileFactory.create();
244      try {
245        writer.append(new KeyValue(ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
246          .setRow(randomBytes).setFamily(family).setQualifier(randomBytes).setTimestamp(0L)
247          .setType(KeyValue.Type.Put.getCode()).setValue(randomBytes).build()));
248      } finally {
249        writer.close();
250      }
251    } finally {
252      out.close();
253    }
254    return hfilePath.toString();
255  }
256
257  private static String generateUniqueName(final String suffix) {
258    String name = UUID.randomUUID().toString().replaceAll("-", "");
259    if (suffix != null) name += suffix;
260    return name;
261  }
262
263}