001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor.example;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022
023import java.io.IOException;
024import org.apache.hadoop.hbase.HBaseTestingUtil;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
027import org.apache.hadoop.hbase.client.Get;
028import org.apache.hadoop.hbase.client.Put;
029import org.apache.hadoop.hbase.client.Table;
030import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
031import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
032import org.apache.hadoop.hbase.testclassification.MediumTests;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
035import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
036import org.apache.zookeeper.CreateMode;
037import org.apache.zookeeper.KeeperException;
038import org.apache.zookeeper.ZooDefs;
039import org.apache.zookeeper.ZooKeeper;
040import org.junit.jupiter.api.AfterAll;
041import org.junit.jupiter.api.BeforeAll;
042import org.junit.jupiter.api.Tag;
043import org.junit.jupiter.api.Test;
044
045@Tag(CoprocessorTests.TAG)
046@Tag(MediumTests.TAG)
047public class TestZooKeeperScanPolicyObserver {
048
049  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
050
051  private static TableName NAME = TableName.valueOf("TestCP");
052
053  private static byte[] FAMILY = Bytes.toBytes("cf");
054
055  private static byte[] QUALIFIER = Bytes.toBytes("cq");
056
057  private static Table TABLE;
058
059  @BeforeAll
060  public static void setUp() throws Exception {
061    UTIL.startMiniCluster(3);
062    UTIL.getAdmin()
063      .createTable(TableDescriptorBuilder.newBuilder(NAME)
064        .setCoprocessor(ZooKeeperScanPolicyObserver.class.getName())
065        .setValue(ZooKeeperScanPolicyObserver.ZK_ENSEMBLE_KEY,
066          UTIL.getZkCluster().getAddress().toString())
067        .setValue(ZooKeeperScanPolicyObserver.ZK_SESSION_TIMEOUT_KEY, "2000")
068        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build()).build());
069    TABLE = UTIL.getConnection().getTable(NAME);
070  }
071
072  @AfterAll
073  public static void tearDown() throws Exception {
074    if (TABLE != null) {
075      TABLE.close();
076    }
077    UTIL.shutdownMiniCluster();
078  }
079
080  private void setExpireBefore(long time)
081    throws KeeperException, InterruptedException, IOException {
082    RecoverableZooKeeper recoverableZk = UTIL.getZooKeeperWatcher().getRecoverableZooKeeper();
083    // we need to call this for setting up the zookeeper connection
084    recoverableZk.reconnectAfterExpiration();
085    // we have to use the original ZooKeeper as the RecoverableZooKeeper will append a magic prefix
086    // for the data stored on zookeeper
087    ZooKeeper zk = recoverableZk.getZooKeeper();
088    if (zk.exists(ZooKeeperScanPolicyObserver.NODE, false) == null) {
089      zk.create(ZooKeeperScanPolicyObserver.NODE, Bytes.toBytes(time), ZooDefs.Ids.OPEN_ACL_UNSAFE,
090        CreateMode.PERSISTENT);
091    } else {
092      zk.setData(ZooKeeperScanPolicyObserver.NODE, Bytes.toBytes(time), -1);
093    }
094  }
095
096  private void assertValueEquals(int start, int end) throws IOException {
097    for (int i = start; i < end; i++) {
098      assertEquals(i,
099        Bytes.toInt(TABLE.get(new Get(Bytes.toBytes(i))).getValue(FAMILY, QUALIFIER)));
100    }
101  }
102
103  private void assertNotExists(int start, int end) throws IOException {
104    for (int i = start; i < end; i++) {
105      assertFalse(TABLE.exists(new Get(Bytes.toBytes(i))));
106    }
107  }
108
109  private void put(int start, int end, long ts) throws IOException {
110    for (int i = start; i < end; i++) {
111      TABLE.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, ts, Bytes.toBytes(i)));
112    }
113  }
114
115  @Test
116  public void test() throws IOException, KeeperException, InterruptedException {
117    long now = EnvironmentEdgeManager.currentTime();
118    put(0, 100, now - 10000);
119    assertValueEquals(0, 100);
120
121    setExpireBefore(now - 5000);
122    Thread.sleep(5000);
123    UTIL.getAdmin().flush(NAME);
124    assertNotExists(0, 100);
125
126    put(0, 50, now - 1000);
127    UTIL.getAdmin().flush(NAME);
128    put(50, 100, now - 100);
129    UTIL.getAdmin().flush(NAME);
130    assertValueEquals(0, 100);
131
132    setExpireBefore(now - 500);
133    Thread.sleep(5000);
134    UTIL.getAdmin().majorCompact(NAME);
135    UTIL.waitFor(30000, () -> UTIL.getHBaseCluster().getRegions(NAME).iterator().next()
136      .getStore(FAMILY).getStorefilesCount() == 1);
137    assertNotExists(0, 50);
138    assertValueEquals(50, 100);
139  }
140}