001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor.example; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022 023import java.io.IOException; 024import org.apache.hadoop.hbase.HBaseTestingUtil; 025import org.apache.hadoop.hbase.TableName; 026import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 027import org.apache.hadoop.hbase.client.Get; 028import org.apache.hadoop.hbase.client.Put; 029import org.apache.hadoop.hbase.client.Table; 030import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 031import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 032import org.apache.hadoop.hbase.testclassification.MediumTests; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 035import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; 036import org.apache.zookeeper.CreateMode; 037import org.apache.zookeeper.KeeperException; 038import org.apache.zookeeper.ZooDefs; 039import org.apache.zookeeper.ZooKeeper; 040import org.junit.jupiter.api.AfterAll; 041import org.junit.jupiter.api.BeforeAll; 042import org.junit.jupiter.api.Tag; 043import org.junit.jupiter.api.Test; 044 045@Tag(CoprocessorTests.TAG) 046@Tag(MediumTests.TAG) 047public class TestZooKeeperScanPolicyObserver { 048 049 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 050 051 private static TableName NAME = TableName.valueOf("TestCP"); 052 053 private static byte[] FAMILY = Bytes.toBytes("cf"); 054 055 private static byte[] QUALIFIER = Bytes.toBytes("cq"); 056 057 private static Table TABLE; 058 059 @BeforeAll 060 public static void setUp() throws Exception { 061 UTIL.startMiniCluster(3); 062 UTIL.getAdmin() 063 .createTable(TableDescriptorBuilder.newBuilder(NAME) 064 .setCoprocessor(ZooKeeperScanPolicyObserver.class.getName()) 065 .setValue(ZooKeeperScanPolicyObserver.ZK_ENSEMBLE_KEY, 066 UTIL.getZkCluster().getAddress().toString()) 067 .setValue(ZooKeeperScanPolicyObserver.ZK_SESSION_TIMEOUT_KEY, "2000") 068 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build()).build()); 069 TABLE = UTIL.getConnection().getTable(NAME); 070 } 071 072 @AfterAll 073 public static void tearDown() throws Exception { 074 if (TABLE != null) { 075 TABLE.close(); 076 } 077 UTIL.shutdownMiniCluster(); 078 } 079 080 private void setExpireBefore(long time) 081 throws KeeperException, InterruptedException, IOException { 082 RecoverableZooKeeper recoverableZk = UTIL.getZooKeeperWatcher().getRecoverableZooKeeper(); 083 // we need to call this for setting up the zookeeper connection 084 recoverableZk.reconnectAfterExpiration(); 085 // we have to use the original ZooKeeper as the RecoverableZooKeeper will append a magic prefix 086 // for the data stored on zookeeper 087 ZooKeeper zk = recoverableZk.getZooKeeper(); 088 if (zk.exists(ZooKeeperScanPolicyObserver.NODE, false) == null) { 089 zk.create(ZooKeeperScanPolicyObserver.NODE, Bytes.toBytes(time), ZooDefs.Ids.OPEN_ACL_UNSAFE, 090 CreateMode.PERSISTENT); 091 } else { 092 zk.setData(ZooKeeperScanPolicyObserver.NODE, Bytes.toBytes(time), -1); 093 } 094 } 095 096 private void assertValueEquals(int start, int end) throws IOException { 097 for (int i = start; i < end; i++) { 098 assertEquals(i, 099 Bytes.toInt(TABLE.get(new Get(Bytes.toBytes(i))).getValue(FAMILY, QUALIFIER))); 100 } 101 } 102 103 private void assertNotExists(int start, int end) throws IOException { 104 for (int i = start; i < end; i++) { 105 assertFalse(TABLE.exists(new Get(Bytes.toBytes(i)))); 106 } 107 } 108 109 private void put(int start, int end, long ts) throws IOException { 110 for (int i = start; i < end; i++) { 111 TABLE.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, ts, Bytes.toBytes(i))); 112 } 113 } 114 115 @Test 116 public void test() throws IOException, KeeperException, InterruptedException { 117 long now = EnvironmentEdgeManager.currentTime(); 118 put(0, 100, now - 10000); 119 assertValueEquals(0, 100); 120 121 setExpireBefore(now - 5000); 122 Thread.sleep(5000); 123 UTIL.getAdmin().flush(NAME); 124 assertNotExists(0, 100); 125 126 put(0, 50, now - 1000); 127 UTIL.getAdmin().flush(NAME); 128 put(50, 100, now - 100); 129 UTIL.getAdmin().flush(NAME); 130 assertValueEquals(0, 100); 131 132 setExpireBefore(now - 500); 133 Thread.sleep(5000); 134 UTIL.getAdmin().majorCompact(NAME); 135 UTIL.waitFor(30000, () -> UTIL.getHBaseCluster().getRegions(NAME).iterator().next() 136 .getStore(FAMILY).getStorefilesCount() == 1); 137 assertNotExists(0, 50); 138 assertValueEquals(50, 100); 139 } 140}