001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor.example; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022 023import java.io.IOException; 024import org.apache.hadoop.hbase.HBaseClassTestRule; 025import org.apache.hadoop.hbase.HBaseTestingUtility; 026import org.apache.hadoop.hbase.TableName; 027import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 028import org.apache.hadoop.hbase.client.Get; 029import org.apache.hadoop.hbase.client.Put; 030import org.apache.hadoop.hbase.client.Table; 031import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 032import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 033import org.apache.hadoop.hbase.testclassification.MediumTests; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.zookeeper.CreateMode; 036import org.apache.zookeeper.KeeperException; 037import org.apache.zookeeper.ZooDefs; 038import org.apache.zookeeper.ZooKeeper; 039import org.junit.AfterClass; 040import org.junit.BeforeClass; 041import org.junit.ClassRule; 042import org.junit.Test; 043import org.junit.experimental.categories.Category; 044 045@Category({ CoprocessorTests.class, MediumTests.class }) 046public class TestZooKeeperScanPolicyObserver { 047 048 @ClassRule 049 public static final HBaseClassTestRule CLASS_RULE = 050 HBaseClassTestRule.forClass(TestZooKeeperScanPolicyObserver.class); 051 052 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 053 054 private static TableName NAME = TableName.valueOf("TestCP"); 055 056 private static byte[] FAMILY = Bytes.toBytes("cf"); 057 058 private static byte[] QUALIFIER = Bytes.toBytes("cq"); 059 060 private static Table TABLE; 061 062 @BeforeClass 063 public static void setUp() throws Exception { 064 UTIL.startMiniCluster(3); 065 UTIL.getAdmin() 066 .createTable(TableDescriptorBuilder.newBuilder(NAME) 067 .setCoprocessor(ZooKeeperScanPolicyObserver.class.getName()) 068 .setValue(ZooKeeperScanPolicyObserver.ZK_ENSEMBLE_KEY, 069 "localhost:" + UTIL.getZkCluster().getClientPort()) 070 .setValue(ZooKeeperScanPolicyObserver.ZK_SESSION_TIMEOUT_KEY, "2000") 071 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build()).build()); 072 TABLE = UTIL.getConnection().getTable(NAME); 073 } 074 075 @AfterClass 076 public static void tearDown() throws Exception { 077 if (TABLE != null) { 078 TABLE.close(); 079 } 080 UTIL.shutdownMiniCluster(); 081 } 082 083 private void setExpireBefore(long time) 084 throws KeeperException, InterruptedException, IOException { 085 ZooKeeper zk = UTIL.getZooKeeperWatcher().getRecoverableZooKeeper().getZooKeeper(); 086 if (zk.exists(ZooKeeperScanPolicyObserver.NODE, false) == null) { 087 zk.create(ZooKeeperScanPolicyObserver.NODE, Bytes.toBytes(time), ZooDefs.Ids.OPEN_ACL_UNSAFE, 088 CreateMode.PERSISTENT); 089 } else { 090 zk.setData(ZooKeeperScanPolicyObserver.NODE, Bytes.toBytes(time), -1); 091 } 092 } 093 094 private void assertValueEquals(int start, int end) throws IOException { 095 for (int i = start; i < end; i++) { 096 assertEquals(i, 097 Bytes.toInt(TABLE.get(new Get(Bytes.toBytes(i))).getValue(FAMILY, QUALIFIER))); 098 } 099 } 100 101 private void assertNotExists(int start, int end) throws IOException { 102 for (int i = start; i < end; i++) { 103 assertFalse(TABLE.exists(new Get(Bytes.toBytes(i)))); 104 } 105 } 106 107 private void put(int start, int end, long ts) throws IOException { 108 for (int i = start; i < end; i++) { 109 TABLE.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, ts, Bytes.toBytes(i))); 110 } 111 } 112 113 @Test 114 public void test() throws IOException, KeeperException, InterruptedException { 115 long now = System.currentTimeMillis(); 116 put(0, 100, now - 10000); 117 assertValueEquals(0, 100); 118 119 setExpireBefore(now - 5000); 120 Thread.sleep(5000); 121 UTIL.getAdmin().flush(NAME); 122 assertNotExists(0, 100); 123 124 put(0, 50, now - 1000); 125 UTIL.getAdmin().flush(NAME); 126 put(50, 100, now - 100); 127 UTIL.getAdmin().flush(NAME); 128 assertValueEquals(0, 100); 129 130 setExpireBefore(now - 500); 131 Thread.sleep(5000); 132 UTIL.getAdmin().majorCompact(NAME); 133 UTIL.waitFor(30000, () -> UTIL.getHBaseCluster().getRegions(NAME).iterator().next() 134 .getStore(FAMILY).getStorefilesCount() == 1); 135 assertNotExists(0, 50); 136 assertValueEquals(50, 100); 137 } 138}