001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor.example; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022 023import java.io.IOException; 024import org.apache.hadoop.hbase.HBaseClassTestRule; 025import org.apache.hadoop.hbase.HBaseTestingUtility; 026import org.apache.hadoop.hbase.TableName; 027import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 028import org.apache.hadoop.hbase.client.Get; 029import org.apache.hadoop.hbase.client.Put; 030import org.apache.hadoop.hbase.client.Table; 031import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 032import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 033import org.apache.hadoop.hbase.testclassification.MediumTests; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 036import org.apache.zookeeper.CreateMode; 037import org.apache.zookeeper.KeeperException; 038import org.apache.zookeeper.ZooDefs; 039import org.apache.zookeeper.ZooKeeper; 040import org.junit.AfterClass; 041import org.junit.BeforeClass; 042import org.junit.ClassRule; 043import org.junit.Test; 044import org.junit.experimental.categories.Category; 045 046@Category({ CoprocessorTests.class, MediumTests.class }) 047public class TestZooKeeperScanPolicyObserver { 048 049 @ClassRule 050 public static final HBaseClassTestRule CLASS_RULE = 051 HBaseClassTestRule.forClass(TestZooKeeperScanPolicyObserver.class); 052 053 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 054 055 private static TableName NAME = TableName.valueOf("TestCP"); 056 057 private static byte[] FAMILY = Bytes.toBytes("cf"); 058 059 private static byte[] QUALIFIER = Bytes.toBytes("cq"); 060 061 private static Table TABLE; 062 063 @BeforeClass 064 public static void setUp() throws Exception { 065 UTIL.startMiniCluster(3); 066 UTIL.getAdmin() 067 .createTable(TableDescriptorBuilder.newBuilder(NAME) 068 .setCoprocessor(ZooKeeperScanPolicyObserver.class.getName()) 069 .setValue(ZooKeeperScanPolicyObserver.ZK_ENSEMBLE_KEY, 070 UTIL.getZkCluster().getAddress().toString()) 071 .setValue(ZooKeeperScanPolicyObserver.ZK_SESSION_TIMEOUT_KEY, "2000") 072 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build()).build()); 073 TABLE = UTIL.getConnection().getTable(NAME); 074 } 075 076 @AfterClass 077 public static void tearDown() throws Exception { 078 if (TABLE != null) { 079 TABLE.close(); 080 } 081 UTIL.shutdownMiniCluster(); 082 } 083 084 private void setExpireBefore(long time) 085 throws KeeperException, InterruptedException, IOException { 086 ZooKeeper zk = UTIL.getZooKeeperWatcher().getRecoverableZooKeeper().getZooKeeper(); 087 if (zk.exists(ZooKeeperScanPolicyObserver.NODE, false) == null) { 088 zk.create(ZooKeeperScanPolicyObserver.NODE, Bytes.toBytes(time), ZooDefs.Ids.OPEN_ACL_UNSAFE, 089 CreateMode.PERSISTENT); 090 } else { 091 zk.setData(ZooKeeperScanPolicyObserver.NODE, Bytes.toBytes(time), -1); 092 } 093 } 094 095 private void assertValueEquals(int start, int end) throws IOException { 096 for (int i = start; i < end; i++) { 097 assertEquals(i, 098 Bytes.toInt(TABLE.get(new Get(Bytes.toBytes(i))).getValue(FAMILY, QUALIFIER))); 099 } 100 } 101 102 private void assertNotExists(int start, int end) throws IOException { 103 for (int i = start; i < end; i++) { 104 assertFalse(TABLE.exists(new Get(Bytes.toBytes(i)))); 105 } 106 } 107 108 private void put(int start, int end, long ts) throws IOException { 109 for (int i = start; i < end; i++) { 110 TABLE.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, ts, Bytes.toBytes(i))); 111 } 112 } 113 114 @Test 115 public void test() throws IOException, KeeperException, InterruptedException { 116 long now = EnvironmentEdgeManager.currentTime(); 117 put(0, 100, now - 10000); 118 assertValueEquals(0, 100); 119 120 setExpireBefore(now - 5000); 121 Thread.sleep(5000); 122 UTIL.getAdmin().flush(NAME); 123 assertNotExists(0, 100); 124 125 put(0, 50, now - 1000); 126 UTIL.getAdmin().flush(NAME); 127 put(50, 100, now - 100); 128 UTIL.getAdmin().flush(NAME); 129 assertValueEquals(0, 100); 130 131 setExpireBefore(now - 500); 132 Thread.sleep(5000); 133 UTIL.getAdmin().majorCompact(NAME); 134 UTIL.waitFor(30000, () -> UTIL.getHBaseCluster().getRegions(NAME).iterator().next() 135 .getStore(FAMILY).getStorefilesCount() == 1); 136 assertNotExists(0, 50); 137 assertValueEquals(50, 100); 138 } 139}