001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024import static org.mockito.Mockito.doThrow; 025import static org.mockito.Mockito.spy; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.Iterator; 030import java.util.List; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Abortable; 036import org.apache.hadoop.hbase.ChoreService; 037import org.apache.hadoop.hbase.CoordinatedStateManager; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.Server; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.ZooKeeperConnectionException; 044import org.apache.hadoop.hbase.client.ClusterConnection; 045import org.apache.hadoop.hbase.client.Connection; 046import org.apache.hadoop.hbase.master.HMaster; 047import org.apache.hadoop.hbase.replication.ReplicationException; 048import org.apache.hadoop.hbase.replication.ReplicationFactory; 049import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 050import org.apache.hadoop.hbase.replication.ReplicationPeers; 051import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 052import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 053import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; 054import org.apache.hadoop.hbase.testclassification.MasterTests; 055import org.apache.hadoop.hbase.testclassification.SmallTests; 056import org.apache.hadoop.hbase.util.Pair; 057import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; 058import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 059import org.apache.zookeeper.KeeperException; 060import org.apache.zookeeper.data.Stat; 061import org.junit.After; 062import org.junit.AfterClass; 063import org.junit.Before; 064import org.junit.BeforeClass; 065import org.junit.ClassRule; 066import org.junit.Test; 067import org.junit.experimental.categories.Category; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 072 073@Category({ MasterTests.class, SmallTests.class }) 074public class TestReplicationHFileCleaner { 075 076 @ClassRule 077 public static final HBaseClassTestRule CLASS_RULE = 078 HBaseClassTestRule.forClass(TestReplicationHFileCleaner.class); 079 080 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationHFileCleaner.class); 081 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 082 private static Server server; 083 private static ReplicationQueueStorage rq; 084 private static ReplicationPeers rp; 085 private static final String peerId = "TestReplicationHFileCleaner"; 086 private static Configuration conf = TEST_UTIL.getConfiguration(); 087 static FileSystem fs = null; 088 Path root; 089 090 @BeforeClass 091 public static void setUpBeforeClass() throws Exception { 092 TEST_UTIL.startMiniZKCluster(); 093 server = new DummyServer(); 094 conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); 095 HMaster.decorateMasterConfiguration(conf); 096 rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf); 097 rp.init(); 098 rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); 099 fs = FileSystem.get(conf); 100 } 101 102 @AfterClass 103 public static void tearDownAfterClass() throws Exception { 104 TEST_UTIL.shutdownMiniZKCluster(); 105 } 106 107 @Before 108 public void setup() throws ReplicationException, IOException { 109 root = TEST_UTIL.getDataTestDirOnTestFS(); 110 rp.getPeerStorage().addPeer(peerId, 111 ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL.getClusterKey()).build(), true); 112 rq.addPeerToHFileRefs(peerId); 113 } 114 115 @After 116 public void cleanup() throws ReplicationException { 117 try { 118 fs.delete(root, true); 119 } catch (IOException e) { 120 LOG.warn("Failed to delete files recursively from path " + root); 121 } 122 rp.getPeerStorage().removePeer(peerId); 123 } 124 125 @Test 126 public void testIsFileDeletable() throws IOException, ReplicationException { 127 // 1. Create a file 128 Path file = new Path(root, "testIsFileDeletableWithNoHFileRefs"); 129 fs.createNewFile(file); 130 // 2. Assert file is successfully created 131 assertTrue("Test file not created!", fs.exists(file)); 132 ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner(); 133 cleaner.setConf(conf); 134 // 3. Assert that file as is should be deletable 135 assertTrue("Cleaner should allow to delete this file as there is no hfile reference node " 136 + "for it in the queue.", 137 cleaner.isFileDeletable(fs.getFileStatus(file))); 138 139 List<Pair<Path, Path>> files = new ArrayList<>(1); 140 files.add(new Pair<>(null, file)); 141 // 4. Add the file to hfile-refs queue 142 rq.addHFileRefs(peerId, files); 143 // 5. Assert file should not be deletable 144 assertFalse("Cleaner should not allow to delete this file as there is a hfile reference node " 145 + "for it in the queue.", 146 cleaner.isFileDeletable(fs.getFileStatus(file))); 147 } 148 149 @Test 150 public void testGetDeletableFiles() throws Exception { 151 // 1. Create two files and assert that they do not exist 152 Path notDeletablefile = new Path(root, "testGetDeletableFiles_1"); 153 fs.createNewFile(notDeletablefile); 154 assertTrue("Test file not created!", fs.exists(notDeletablefile)); 155 Path deletablefile = new Path(root, "testGetDeletableFiles_2"); 156 fs.createNewFile(deletablefile); 157 assertTrue("Test file not created!", fs.exists(deletablefile)); 158 159 List<FileStatus> files = new ArrayList<>(2); 160 FileStatus f = new FileStatus(); 161 f.setPath(deletablefile); 162 files.add(f); 163 f = new FileStatus(); 164 f.setPath(notDeletablefile); 165 files.add(f); 166 167 List<Pair<Path, Path>> hfiles = new ArrayList<>(1); 168 hfiles.add(new Pair<>(null, notDeletablefile)); 169 // 2. Add one file to hfile-refs queue 170 rq.addHFileRefs(peerId, hfiles); 171 172 ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner(); 173 cleaner.setConf(conf); 174 Iterator<FileStatus> deletableFilesIterator = cleaner.getDeletableFiles(files).iterator(); 175 int i = 0; 176 while (deletableFilesIterator.hasNext() && i < 2) { 177 i++; 178 } 179 // 5. Assert one file should not be deletable and it is present in the list returned 180 if (i > 2) { 181 fail("File " + notDeletablefile 182 + " should not be deletable as its hfile reference node is not added."); 183 } 184 assertTrue(deletableFilesIterator.next().getPath().equals(deletablefile)); 185 } 186 187 /** 188 * ReplicationHFileCleaner should be able to ride over ZooKeeper errors without aborting. 189 */ 190 @Test 191 public void testZooKeeperAbort() throws Exception { 192 ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner(); 193 194 List<FileStatus> dummyFiles = 195 Lists.newArrayList(new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path( 196 "hfile1")), new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path( 197 "hfile2"))); 198 199 FaultyZooKeeperWatcher faultyZK = 200 new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null); 201 try { 202 faultyZK.init(); 203 cleaner.setConf(conf, faultyZK); 204 // should keep all files due to a ConnectionLossException getting the queues znodes 205 Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles); 206 assertFalse(toDelete.iterator().hasNext()); 207 assertFalse(cleaner.isStopped()); 208 } finally { 209 faultyZK.close(); 210 } 211 212 // when zk is working both files should be returned 213 cleaner = new ReplicationHFileCleaner(); 214 ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null); 215 try { 216 cleaner.setConf(conf, zkw); 217 Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles); 218 Iterator<FileStatus> iter = filesToDelete.iterator(); 219 assertTrue(iter.hasNext()); 220 assertEquals(new Path("hfile1"), iter.next().getPath()); 221 assertTrue(iter.hasNext()); 222 assertEquals(new Path("hfile2"), iter.next().getPath()); 223 assertFalse(iter.hasNext()); 224 } finally { 225 zkw.close(); 226 } 227 } 228 229 static class DummyServer implements Server { 230 231 @Override 232 public Configuration getConfiguration() { 233 return TEST_UTIL.getConfiguration(); 234 } 235 236 @Override 237 public ZKWatcher getZooKeeper() { 238 try { 239 return new ZKWatcher(getConfiguration(), "dummy server", this); 240 } catch (IOException e) { 241 e.printStackTrace(); 242 } 243 return null; 244 } 245 246 @Override 247 public CoordinatedStateManager getCoordinatedStateManager() { 248 return null; 249 } 250 251 @Override 252 public ClusterConnection getConnection() { 253 return null; 254 } 255 256 @Override 257 public ServerName getServerName() { 258 return ServerName.valueOf("regionserver,60020,000000"); 259 } 260 261 @Override 262 public void abort(String why, Throwable e) { 263 } 264 265 @Override 266 public boolean isAborted() { 267 return false; 268 } 269 270 @Override 271 public void stop(String why) { 272 } 273 274 @Override 275 public boolean isStopped() { 276 return false; 277 } 278 279 @Override 280 public ChoreService getChoreService() { 281 return null; 282 } 283 284 @Override 285 public ClusterConnection getClusterConnection() { 286 // TODO Auto-generated method stub 287 return null; 288 } 289 290 @Override 291 public FileSystem getFileSystem() { 292 return null; 293 } 294 295 @Override 296 public boolean isStopping() { 297 return false; 298 } 299 300 @Override 301 public Connection createConnection(Configuration conf) throws IOException { 302 return null; 303 } 304 } 305 306 static class FaultyZooKeeperWatcher extends ZKWatcher { 307 private RecoverableZooKeeper zk; 308 public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable) 309 throws ZooKeeperConnectionException, IOException { 310 super(conf, identifier, abortable); 311 } 312 313 public void init() throws Exception { 314 this.zk = spy(super.getRecoverableZooKeeper()); 315 doThrow(new KeeperException.ConnectionLossException()) 316 .when(zk).getData("/hbase/replication/hfile-refs", null, new Stat()); 317 } 318 319 @Override 320 public RecoverableZooKeeper getRecoverableZooKeeper() { 321 return zk; 322 } 323 } 324}