001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024import static org.mockito.Mockito.doThrow; 025import static org.mockito.Mockito.spy; 026 027import java.io.IOException; 028import java.lang.reflect.Field; 029import java.util.ArrayList; 030import java.util.Iterator; 031import java.util.List; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileStatus; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.Abortable; 037import org.apache.hadoop.hbase.ChoreService; 038import org.apache.hadoop.hbase.CoordinatedStateManager; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseTestingUtility; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.Server; 043import org.apache.hadoop.hbase.ServerName; 044import org.apache.hadoop.hbase.ZooKeeperConnectionException; 045import org.apache.hadoop.hbase.client.ClusterConnection; 046import org.apache.hadoop.hbase.client.Connection; 047import org.apache.hadoop.hbase.master.HMaster; 048import org.apache.hadoop.hbase.replication.ReplicationException; 049import org.apache.hadoop.hbase.replication.ReplicationFactory; 050import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 051import org.apache.hadoop.hbase.replication.ReplicationPeers; 052import org.apache.hadoop.hbase.replication.ReplicationQueues; 053import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; 054import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; 055import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; 056import org.apache.hadoop.hbase.testclassification.MasterTests; 057import org.apache.hadoop.hbase.testclassification.SmallTests; 058import org.apache.hadoop.hbase.util.Pair; 059import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 060import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; 061import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 062import org.apache.zookeeper.KeeperException; 063import org.apache.zookeeper.data.Stat; 064import org.junit.After; 065import org.junit.AfterClass; 066import org.junit.Before; 067import org.junit.BeforeClass; 068import org.junit.ClassRule; 069import org.junit.Test; 070import org.junit.experimental.categories.Category; 071import org.mockito.Mockito; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074 075import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 076 077@Category({ MasterTests.class, SmallTests.class }) 078public class TestReplicationHFileCleaner { 079 080 @ClassRule 081 public static final HBaseClassTestRule CLASS_RULE = 082 HBaseClassTestRule.forClass(TestReplicationHFileCleaner.class); 083 084 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationHFileCleaner.class); 085 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 086 private static Server server; 087 private static ReplicationQueues rq; 088 private static ReplicationPeers rp; 089 private static final String peerId = "TestReplicationHFileCleaner"; 090 private static Configuration conf = TEST_UTIL.getConfiguration(); 091 static FileSystem fs = null; 092 Path root; 093 094 /** 095 * @throws java.lang.Exception 096 */ 097 @BeforeClass 098 public static void setUpBeforeClass() throws Exception { 099 TEST_UTIL.startMiniZKCluster(); 100 server = new DummyServer(); 101 conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 102 HMaster.decorateMasterConfiguration(conf); 103 rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server); 104 rp.init(); 105 rq = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server, server.getZooKeeper())); 106 rq.init(server.getServerName().toString()); 107 try { 108 fs = FileSystem.get(conf); 109 } finally { 110 if (fs != null) { 111 fs.close(); 112 } 113 } 114 } 115 116 /** 117 * @throws java.lang.Exception 118 */ 119 @AfterClass 120 public static void tearDownAfterClass() throws Exception { 121 TEST_UTIL.shutdownMiniZKCluster(); 122 } 123 124 @Before 125 public void setup() throws ReplicationException, IOException { 126 root = TEST_UTIL.getDataTestDirOnTestFS(); 127 rp.registerPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey())); 128 rq.addPeerToHFileRefs(peerId); 129 } 130 131 @After 132 public void cleanup() throws ReplicationException { 133 try { 134 fs.delete(root, true); 135 } catch (IOException e) { 136 LOG.warn("Failed to delete files recursively from path " + root); 137 } 138 rp.unregisterPeer(peerId); 139 } 140 141 @Test 142 public void testIsFileDeletable() throws IOException, ReplicationException { 143 // 1. Create a file 144 Path file = new Path(root, "testIsFileDeletableWithNoHFileRefs"); 145 fs.createNewFile(file); 146 // 2. Assert file is successfully created 147 assertTrue("Test file not created!", fs.exists(file)); 148 ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner(); 149 cleaner.setConf(conf); 150 // 3. Assert that file as is should be deletable 151 assertTrue("Cleaner should allow to delete this file as there is no hfile reference node " 152 + "for it in the queue.", 153 cleaner.isFileDeletable(fs.getFileStatus(file))); 154 155 List<Pair<Path, Path>> files = new ArrayList<>(1); 156 files.add(new Pair<>(null, file)); 157 // 4. Add the file to hfile-refs queue 158 rq.addHFileRefs(peerId, files); 159 // 5. Assert file should not be deletable 160 assertFalse("Cleaner should not allow to delete this file as there is a hfile reference node " 161 + "for it in the queue.", 162 cleaner.isFileDeletable(fs.getFileStatus(file))); 163 } 164 165 @Test 166 public void testGetDeletableFiles() throws Exception { 167 // 1. Create two files and assert that they do not exist 168 Path notDeletablefile = new Path(root, "testGetDeletableFiles_1"); 169 fs.createNewFile(notDeletablefile); 170 assertTrue("Test file not created!", fs.exists(notDeletablefile)); 171 Path deletablefile = new Path(root, "testGetDeletableFiles_2"); 172 fs.createNewFile(deletablefile); 173 assertTrue("Test file not created!", fs.exists(deletablefile)); 174 175 List<FileStatus> files = new ArrayList<>(2); 176 FileStatus f = new FileStatus(); 177 f.setPath(deletablefile); 178 files.add(f); 179 f = new FileStatus(); 180 f.setPath(notDeletablefile); 181 files.add(f); 182 183 List<Pair<Path, Path>> hfiles = new ArrayList<>(1); 184 hfiles.add(new Pair<>(null, notDeletablefile)); 185 // 2. Add one file to hfile-refs queue 186 rq.addHFileRefs(peerId, hfiles); 187 188 ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner(); 189 cleaner.setConf(conf); 190 Iterator<FileStatus> deletableFilesIterator = cleaner.getDeletableFiles(files).iterator(); 191 int i = 0; 192 while (deletableFilesIterator.hasNext() && i < 2) { 193 i++; 194 } 195 // 5. Assert one file should not be deletable and it is present in the list returned 196 if (i > 2) { 197 fail("File " + notDeletablefile 198 + " should not be deletable as its hfile reference node is not added."); 199 } 200 assertTrue(deletableFilesIterator.next().getPath().equals(deletablefile)); 201 } 202 203 /* 204 * Test for HBASE-14621. This test will not assert directly anything. Without the fix the test 205 * will end up in a infinite loop, so it will timeout. 206 */ 207 @Test 208 public void testForDifferntHFileRefsZnodeVersion() throws Exception { 209 // 1. Create a file 210 Path file = new Path(root, "testForDifferntHFileRefsZnodeVersion"); 211 fs.createNewFile(file); 212 // 2. Assert file is successfully created 213 assertTrue("Test file not created!", fs.exists(file)); 214 ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner(); 215 cleaner.setConf(conf); 216 217 ReplicationQueuesClient replicationQueuesClient = Mockito.mock(ReplicationQueuesClient.class); 218 //Return different znode version for each call 219 Mockito.when(replicationQueuesClient.getHFileRefsNodeChangeVersion()).thenReturn(1, 2); 220 221 Class<? extends ReplicationHFileCleaner> cleanerClass = cleaner.getClass(); 222 Field rqc = cleanerClass.getDeclaredField("rqc"); 223 rqc.setAccessible(true); 224 rqc.set(cleaner, replicationQueuesClient); 225 226 cleaner.isFileDeletable(fs.getFileStatus(file)); 227 } 228 229 /** 230 * ReplicationHFileCleaner should be able to ride over ZooKeeper errors without aborting. 231 */ 232 @Test 233 public void testZooKeeperAbort() throws Exception { 234 ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner(); 235 236 List<FileStatus> dummyFiles = 237 Lists.newArrayList(new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path( 238 "hfile1")), new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path( 239 "hfile2"))); 240 241 FaultyZooKeeperWatcher faultyZK = 242 new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null); 243 try { 244 faultyZK.init(); 245 cleaner.setConf(conf, faultyZK); 246 // should keep all files due to a ConnectionLossException getting the queues znodes 247 Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles); 248 assertFalse(toDelete.iterator().hasNext()); 249 assertFalse(cleaner.isStopped()); 250 } finally { 251 faultyZK.close(); 252 } 253 254 // when zk is working both files should be returned 255 cleaner = new ReplicationHFileCleaner(); 256 ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null); 257 try { 258 cleaner.setConf(conf, zkw); 259 Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles); 260 Iterator<FileStatus> iter = filesToDelete.iterator(); 261 assertTrue(iter.hasNext()); 262 assertEquals(new Path("hfile1"), iter.next().getPath()); 263 assertTrue(iter.hasNext()); 264 assertEquals(new Path("hfile2"), iter.next().getPath()); 265 assertFalse(iter.hasNext()); 266 } finally { 267 zkw.close(); 268 } 269 } 270 271 static class DummyServer implements Server { 272 273 @Override 274 public Configuration getConfiguration() { 275 return TEST_UTIL.getConfiguration(); 276 } 277 278 @Override 279 public ZKWatcher getZooKeeper() { 280 try { 281 return new ZKWatcher(getConfiguration(), "dummy server", this); 282 } catch (IOException e) { 283 e.printStackTrace(); 284 } 285 return null; 286 } 287 288 @Override 289 public CoordinatedStateManager getCoordinatedStateManager() { 290 return null; 291 } 292 293 @Override 294 public ClusterConnection getConnection() { 295 return null; 296 } 297 298 @Override 299 public MetaTableLocator getMetaTableLocator() { 300 return null; 301 } 302 303 @Override 304 public ServerName getServerName() { 305 return ServerName.valueOf("regionserver,60020,000000"); 306 } 307 308 @Override 309 public void abort(String why, Throwable e) { 310 } 311 312 @Override 313 public boolean isAborted() { 314 return false; 315 } 316 317 @Override 318 public void stop(String why) { 319 } 320 321 @Override 322 public boolean isStopped() { 323 return false; 324 } 325 326 @Override 327 public ChoreService getChoreService() { 328 return null; 329 } 330 331 @Override 332 public ClusterConnection getClusterConnection() { 333 // TODO Auto-generated method stub 334 return null; 335 } 336 337 @Override 338 public FileSystem getFileSystem() { 339 return null; 340 } 341 342 @Override 343 public boolean isStopping() { 344 return false; 345 } 346 347 @Override 348 public Connection createConnection(Configuration conf) throws IOException { 349 return null; 350 } 351 } 352 353 static class FaultyZooKeeperWatcher extends ZKWatcher { 354 private RecoverableZooKeeper zk; 355 public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable) 356 throws ZooKeeperConnectionException, IOException { 357 super(conf, identifier, abortable); 358 } 359 360 public void init() throws Exception { 361 this.zk = spy(super.getRecoverableZooKeeper()); 362 doThrow(new KeeperException.ConnectionLossException()) 363 .when(zk).getData("/hbase/replication/hfile-refs", null, new Stat()); 364 } 365 366 @Override 367 public RecoverableZooKeeper getRecoverableZooKeeper() { 368 return zk; 369 } 370 } 371}