001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.UUID; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.fs.FSDataOutputStream; 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.CellBuilderType; 027import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtil; 030import org.apache.hadoop.hbase.KeyValue; 031import org.apache.hadoop.hbase.io.hfile.HFile; 032import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 033import org.apache.hadoop.hbase.testclassification.LargeTests; 034import org.apache.hadoop.hbase.testclassification.MiscTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hdfs.DistributedFileSystem; 037import org.apache.hadoop.hdfs.MiniDFSCluster; 038import org.junit.After; 039import org.junit.Assert; 040import org.junit.Before; 041import org.junit.ClassRule; 042import org.junit.Rule; 043import org.junit.Test; 044import org.junit.experimental.categories.Category; 045import org.junit.rules.TemporaryFolder; 046import org.junit.rules.TestName; 047 048/** 049 * Tests for failedBulkLoad logic to make sure staged files are returned to their original location 050 * if the bulkload have failed. 051 */ 052@Category({ MiscTests.class, LargeTests.class }) 053public class TestSecureBulkloadListener { 054 055 @ClassRule 056 public static final HBaseClassTestRule CLASS_RULE = 057 HBaseClassTestRule.forClass(TestSecureBulkloadListener.class); 058 059 @ClassRule 060 public static TemporaryFolder testFolder = new TemporaryFolder(); 061 private Configuration conf; 062 private MiniDFSCluster cluster; 063 private HBaseTestingUtil htu; 064 private DistributedFileSystem dfs; 065 private final byte[] randomBytes = new byte[100]; 066 private static final String host1 = "host1"; 067 private static final String host2 = "host2"; 068 private static final String host3 = "host3"; 069 private static byte[] FAMILY = Bytes.toBytes("family"); 070 private static final String STAGING_DIR = "staging"; 071 private static final String CUSTOM_STAGING_DIR = "customStaging"; 072 073 @Rule 074 public TestName name = new TestName(); 075 076 @Before 077 public void setUp() throws Exception { 078 Bytes.random(randomBytes); 079 htu = new HBaseTestingUtil(); 080 htu.getConfiguration().setInt("dfs.blocksize", 1024);// For the test with multiple blocks 081 htu.getConfiguration().setInt("dfs.replication", 3); 082 htu.startMiniDFSCluster(3, new String[] { "/r1", "/r2", "/r3" }, 083 new String[] { host1, host2, host3 }); 084 085 conf = htu.getConfiguration(); 086 cluster = htu.getDFSCluster(); 087 dfs = (DistributedFileSystem) FileSystem.get(conf); 088 } 089 090 @After 091 public void tearDownAfterClass() throws Exception { 092 htu.shutdownMiniCluster(); 093 } 094 095 @Test 096 public void testMovingStagedFile() throws Exception { 097 Path stagingDirPath = 098 new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), STAGING_DIR)); 099 if (!dfs.exists(stagingDirPath)) { 100 dfs.mkdirs(stagingDirPath); 101 } 102 SecureBulkLoadManager.SecureBulkLoadListener listener = 103 new SecureBulkLoadManager.SecureBulkLoadListener(dfs, stagingDirPath.toString(), conf); 104 105 // creating file to load 106 String srcFile = createHFileForFamilies(FAMILY); 107 Path srcPath = new Path(srcFile); 108 Assert.assertTrue(dfs.exists(srcPath)); 109 110 Path stagedFamily = new Path(stagingDirPath, new Path(Bytes.toString(FAMILY))); 111 if (!dfs.exists(stagedFamily)) { 112 dfs.mkdirs(stagedFamily); 113 } 114 115 // moving file to staging 116 String stagedFile = listener.prepareBulkLoad(FAMILY, srcFile, false, null); 117 Path stagedPath = new Path(stagedFile); 118 Assert.assertTrue(dfs.exists(stagedPath)); 119 Assert.assertFalse(dfs.exists(srcPath)); 120 121 // moving files back to original location after a failed bulkload 122 listener.failedBulkLoad(FAMILY, stagedFile); 123 Assert.assertFalse(dfs.exists(stagedPath)); 124 Assert.assertTrue(dfs.exists(srcPath)); 125 } 126 127 @Test 128 public void testMovingStagedFileWithCustomStageDir() throws Exception { 129 Path stagingDirPath = 130 new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), STAGING_DIR)); 131 if (!dfs.exists(stagingDirPath)) { 132 dfs.mkdirs(stagingDirPath); 133 } 134 SecureBulkLoadManager.SecureBulkLoadListener listener = 135 new SecureBulkLoadManager.SecureBulkLoadListener(dfs, stagingDirPath.toString(), conf); 136 137 // creating file to load 138 String srcFile = createHFileForFamilies(FAMILY); 139 Path srcPath = new Path(srcFile); 140 Assert.assertTrue(dfs.exists(srcPath)); 141 142 Path stagedFamily = new Path(stagingDirPath, new Path(Bytes.toString(FAMILY))); 143 if (!dfs.exists(stagedFamily)) { 144 dfs.mkdirs(stagedFamily); 145 } 146 147 Path customStagingDirPath = 148 new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), CUSTOM_STAGING_DIR)); 149 Path customStagedFamily = new Path(customStagingDirPath, new Path(Bytes.toString(FAMILY))); 150 if (!dfs.exists(customStagedFamily)) { 151 dfs.mkdirs(customStagedFamily); 152 } 153 154 // moving file to staging using a custom staging dir 155 String stagedFile = 156 listener.prepareBulkLoad(FAMILY, srcFile, false, customStagingDirPath.toString()); 157 Path stagedPath = new Path(stagedFile); 158 Assert.assertTrue(dfs.exists(stagedPath)); 159 Assert.assertFalse(dfs.exists(srcPath)); 160 161 // moving files back to original location after a failed bulkload 162 listener.failedBulkLoad(FAMILY, stagedFile); 163 Assert.assertFalse(dfs.exists(stagedPath)); 164 Assert.assertTrue(dfs.exists(srcPath)); 165 } 166 167 @Test 168 public void testCopiedStagedFile() throws Exception { 169 Path stagingDirPath = 170 new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), STAGING_DIR)); 171 if (!dfs.exists(stagingDirPath)) { 172 dfs.mkdirs(stagingDirPath); 173 } 174 SecureBulkLoadManager.SecureBulkLoadListener listener = 175 new SecureBulkLoadManager.SecureBulkLoadListener(dfs, stagingDirPath.toString(), conf); 176 177 // creating file to load 178 String srcFile = createHFileForFamilies(FAMILY); 179 Path srcPath = new Path(srcFile); 180 Assert.assertTrue(dfs.exists(srcPath)); 181 182 Path stagedFamily = new Path(stagingDirPath, new Path(Bytes.toString(FAMILY))); 183 if (!dfs.exists(stagedFamily)) { 184 dfs.mkdirs(stagedFamily); 185 } 186 187 // copying file to staging 188 String stagedFile = listener.prepareBulkLoad(FAMILY, srcFile, true, null); 189 Path stagedPath = new Path(stagedFile); 190 Assert.assertTrue(dfs.exists(stagedPath)); 191 Assert.assertTrue(dfs.exists(srcPath)); 192 193 // should do nothing because the original file was copied to staging 194 listener.failedBulkLoad(FAMILY, stagedFile); 195 Assert.assertTrue(dfs.exists(stagedPath)); 196 Assert.assertTrue(dfs.exists(srcPath)); 197 } 198 199 @Test(expected = IOException.class) 200 public void testDeletedStagedFile() throws Exception { 201 Path stagingDirPath = 202 new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), STAGING_DIR)); 203 if (!dfs.exists(stagingDirPath)) { 204 dfs.mkdirs(stagingDirPath); 205 } 206 SecureBulkLoadManager.SecureBulkLoadListener listener = 207 new SecureBulkLoadManager.SecureBulkLoadListener(dfs, stagingDirPath.toString(), conf); 208 209 // creating file to load 210 String srcFile = createHFileForFamilies(FAMILY); 211 Path srcPath = new Path(srcFile); 212 Assert.assertTrue(dfs.exists(srcPath)); 213 214 Path stagedFamily = new Path(stagingDirPath, new Path(Bytes.toString(FAMILY))); 215 if (!dfs.exists(stagedFamily)) { 216 dfs.mkdirs(stagedFamily); 217 } 218 219 // moving file to staging 220 String stagedFile = listener.prepareBulkLoad(FAMILY, srcFile, false, null); 221 Path stagedPath = new Path(stagedFile); 222 Assert.assertTrue(dfs.exists(stagedPath)); 223 Assert.assertFalse(dfs.exists(srcPath)); 224 225 dfs.delete(stagedPath, false); 226 227 // moving files back to original location after a failed bulkload 228 listener.failedBulkLoad(FAMILY, stagedFile); 229 } 230 231 private String createHFileForFamilies(byte[] family) throws IOException { 232 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(conf); 233 Path testDir = 234 new Path(dfs.getWorkingDirectory(), new Path(name.getMethodName(), Bytes.toString(family))); 235 if (!dfs.exists(testDir)) { 236 dfs.mkdirs(testDir); 237 } 238 Path hfilePath = new Path(testDir, generateUniqueName(null)); 239 FSDataOutputStream out = dfs.createFile(hfilePath).build(); 240 try { 241 hFileFactory.withOutputStream(out); 242 hFileFactory.withFileContext(new HFileContextBuilder().build()); 243 HFile.Writer writer = hFileFactory.create(); 244 try { 245 writer.append(new KeyValue(ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY) 246 .setRow(randomBytes).setFamily(family).setQualifier(randomBytes).setTimestamp(0L) 247 .setType(KeyValue.Type.Put.getCode()).setValue(randomBytes).build())); 248 } finally { 249 writer.close(); 250 } 251 } finally { 252 out.close(); 253 } 254 return hfilePath.toString(); 255 } 256 257 private static String generateUniqueName(final String suffix) { 258 String name = UUID.randomUUID().toString().replaceAll("-", ""); 259 if (suffix != null) name += suffix; 260 return name; 261 } 262 263}