001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.Random; 022import java.util.UUID; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.fs.FSDataOutputStream; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.CellBuilderType; 028import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtility; 031import org.apache.hadoop.hbase.KeyValue; 032import org.apache.hadoop.hbase.io.hfile.HFile; 033import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 034import org.apache.hadoop.hbase.testclassification.LargeTests; 035import org.apache.hadoop.hbase.testclassification.MiscTests; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hdfs.DistributedFileSystem; 038import org.apache.hadoop.hdfs.MiniDFSCluster; 039import org.junit.After; 040import org.junit.Assert; 041import org.junit.Before; 042import org.junit.ClassRule; 043import org.junit.Rule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046import org.junit.rules.TemporaryFolder; 047import org.junit.rules.TestName; 048 049/** 050 * Tests for failedBulkLoad logic to make sure staged files are returned to their original location 051 * if the bulkload have failed. 052 */ 053@Category({ MiscTests.class, LargeTests.class }) 054public class TestSecureBulkloadListener { 055 056 @ClassRule 057 public static final HBaseClassTestRule CLASS_RULE = 058 HBaseClassTestRule.forClass(TestSecureBulkloadListener.class); 059 060 @ClassRule 061 public static TemporaryFolder testFolder = new TemporaryFolder(); 062 private Configuration conf; 063 private MiniDFSCluster cluster; 064 private HBaseTestingUtility htu; 065 private DistributedFileSystem dfs; 066 private final Random random = new Random(); 067 private final byte[] randomBytes = new byte[100]; 068 private static final String host1 = "host1"; 069 private static final String host2 = "host2"; 070 private static final String host3 = "host3"; 071 private static byte[] FAMILY = Bytes.toBytes("family"); 072 private static final String STAGING_DIR = "staging"; 073 private static final String CUSTOM_STAGING_DIR = "customStaging"; 074 075 @Rule 076 public TestName name = new TestName(); 077 078 @Before 079 public void setUp() throws Exception { 080 random.nextBytes(randomBytes); 081 htu = new HBaseTestingUtility(); 082 htu.getConfiguration().setInt("dfs.blocksize", 1024);// For the test with multiple blocks 083 htu.getConfiguration().setInt("dfs.replication", 3); 084 htu.startMiniDFSCluster(3, new String[] { "/r1", "/r2", "/r3" }, 085 new String[] { host1, host2, host3 }); 086 087 conf = htu.getConfiguration(); 088 cluster = htu.getDFSCluster(); 089 dfs = (DistributedFileSystem) FileSystem.get(conf); 090 } 091 092 @After 093 public void tearDownAfterClass() throws Exception { 094 htu.shutdownMiniCluster(); 095 } 096 097 @Test 098 public void testMovingStagedFile() throws Exception { 099 Path stagingDirPath = 100 new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), STAGING_DIR)); 101 if (!dfs.exists(stagingDirPath)) { 102 dfs.mkdirs(stagingDirPath); 103 } 104 SecureBulkLoadManager.SecureBulkLoadListener listener = 105 new SecureBulkLoadManager.SecureBulkLoadListener(dfs, stagingDirPath.toString(), conf); 106 107 // creating file to load 108 String srcFile = createHFileForFamilies(FAMILY); 109 Path srcPath = new Path(srcFile); 110 Assert.assertTrue(dfs.exists(srcPath)); 111 112 Path stagedFamily = new Path(stagingDirPath, new Path(Bytes.toString(FAMILY))); 113 if (!dfs.exists(stagedFamily)) { 114 dfs.mkdirs(stagedFamily); 115 } 116 117 // moving file to staging 118 String stagedFile = listener.prepareBulkLoad(FAMILY, srcFile, false, null); 119 Path stagedPath = new Path(stagedFile); 120 Assert.assertTrue(dfs.exists(stagedPath)); 121 Assert.assertFalse(dfs.exists(srcPath)); 122 123 // moving files back to original location after a failed bulkload 124 listener.failedBulkLoad(FAMILY, stagedFile); 125 Assert.assertFalse(dfs.exists(stagedPath)); 126 Assert.assertTrue(dfs.exists(srcPath)); 127 } 128 129 @Test 130 public void testMovingStagedFileWithCustomStageDir() throws Exception { 131 Path stagingDirPath = 132 new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), STAGING_DIR)); 133 if (!dfs.exists(stagingDirPath)) { 134 dfs.mkdirs(stagingDirPath); 135 } 136 SecureBulkLoadManager.SecureBulkLoadListener listener = 137 new SecureBulkLoadManager.SecureBulkLoadListener(dfs, stagingDirPath.toString(), conf); 138 139 // creating file to load 140 String srcFile = createHFileForFamilies(FAMILY); 141 Path srcPath = new Path(srcFile); 142 Assert.assertTrue(dfs.exists(srcPath)); 143 144 Path stagedFamily = new Path(stagingDirPath, new Path(Bytes.toString(FAMILY))); 145 if (!dfs.exists(stagedFamily)) { 146 dfs.mkdirs(stagedFamily); 147 } 148 149 Path customStagingDirPath = 150 new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), CUSTOM_STAGING_DIR)); 151 Path customStagedFamily = new Path(customStagingDirPath, new Path(Bytes.toString(FAMILY))); 152 if (!dfs.exists(customStagedFamily)) { 153 dfs.mkdirs(customStagedFamily); 154 } 155 156 // moving file to staging using a custom staging dir 157 String stagedFile = 158 listener.prepareBulkLoad(FAMILY, srcFile, false, customStagingDirPath.toString()); 159 Path stagedPath = new Path(stagedFile); 160 Assert.assertTrue(dfs.exists(stagedPath)); 161 Assert.assertFalse(dfs.exists(srcPath)); 162 163 // moving files back to original location after a failed bulkload 164 listener.failedBulkLoad(FAMILY, stagedFile); 165 Assert.assertFalse(dfs.exists(stagedPath)); 166 Assert.assertTrue(dfs.exists(srcPath)); 167 } 168 169 @Test 170 public void testCopiedStagedFile() throws Exception { 171 Path stagingDirPath = 172 new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), STAGING_DIR)); 173 if (!dfs.exists(stagingDirPath)) { 174 dfs.mkdirs(stagingDirPath); 175 } 176 SecureBulkLoadManager.SecureBulkLoadListener listener = 177 new SecureBulkLoadManager.SecureBulkLoadListener(dfs, stagingDirPath.toString(), conf); 178 179 // creating file to load 180 String srcFile = createHFileForFamilies(FAMILY); 181 Path srcPath = new Path(srcFile); 182 Assert.assertTrue(dfs.exists(srcPath)); 183 184 Path stagedFamily = new Path(stagingDirPath, new Path(Bytes.toString(FAMILY))); 185 if (!dfs.exists(stagedFamily)) { 186 dfs.mkdirs(stagedFamily); 187 } 188 189 // copying file to staging 190 String stagedFile = listener.prepareBulkLoad(FAMILY, srcFile, true, null); 191 Path stagedPath = new Path(stagedFile); 192 Assert.assertTrue(dfs.exists(stagedPath)); 193 Assert.assertTrue(dfs.exists(srcPath)); 194 195 // should do nothing because the original file was copied to staging 196 listener.failedBulkLoad(FAMILY, stagedFile); 197 Assert.assertTrue(dfs.exists(stagedPath)); 198 Assert.assertTrue(dfs.exists(srcPath)); 199 } 200 201 @Test(expected = IOException.class) 202 public void testDeletedStagedFile() throws Exception { 203 Path stagingDirPath = 204 new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), STAGING_DIR)); 205 if (!dfs.exists(stagingDirPath)) { 206 dfs.mkdirs(stagingDirPath); 207 } 208 SecureBulkLoadManager.SecureBulkLoadListener listener = 209 new SecureBulkLoadManager.SecureBulkLoadListener(dfs, stagingDirPath.toString(), conf); 210 211 // creating file to load 212 String srcFile = createHFileForFamilies(FAMILY); 213 Path srcPath = new Path(srcFile); 214 Assert.assertTrue(dfs.exists(srcPath)); 215 216 Path stagedFamily = new Path(stagingDirPath, new Path(Bytes.toString(FAMILY))); 217 if (!dfs.exists(stagedFamily)) { 218 dfs.mkdirs(stagedFamily); 219 } 220 221 // moving file to staging 222 String stagedFile = listener.prepareBulkLoad(FAMILY, srcFile, false, null); 223 Path stagedPath = new Path(stagedFile); 224 Assert.assertTrue(dfs.exists(stagedPath)); 225 Assert.assertFalse(dfs.exists(srcPath)); 226 227 dfs.delete(stagedPath, false); 228 229 // moving files back to original location after a failed bulkload 230 listener.failedBulkLoad(FAMILY, stagedFile); 231 } 232 233 private String createHFileForFamilies(byte[] family) throws IOException { 234 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(conf); 235 Path testDir = 236 new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), Bytes.toString(family))); 237 if (!dfs.exists(testDir)) { 238 dfs.mkdirs(testDir); 239 } 240 Path hfilePath = new Path(testDir, generateUniqueName(null)); 241 FSDataOutputStream out = dfs.createFile(hfilePath).build(); 242 try { 243 hFileFactory.withOutputStream(out); 244 hFileFactory.withFileContext(new HFileContextBuilder().build()); 245 HFile.Writer writer = hFileFactory.create(); 246 try { 247 writer.append(new KeyValue(ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY) 248 .setRow(randomBytes).setFamily(family).setQualifier(randomBytes).setTimestamp(0L) 249 .setType(KeyValue.Type.Put.getCode()).setValue(randomBytes).build())); 250 } finally { 251 writer.close(); 252 } 253 } finally { 254 out.close(); 255 } 256 return hfilePath.toString(); 257 } 258 259 private static String generateUniqueName(final String suffix) { 260 String name = UUID.randomUUID().toString().replaceAll("-", ""); 261 if (suffix != null) { 262 name += suffix; 263 } 264 return name; 265 } 266 267}