001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertFalse;
022
023import java.io.IOException;
024import java.net.URI;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HBaseTestingUtil;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.util.Bytes;
030import org.junit.jupiter.api.AfterEach;
031import org.junit.jupiter.api.BeforeEach;
032import org.junit.jupiter.api.TestInfo;
033import org.junit.jupiter.params.ParameterizedTest;
034import org.junit.jupiter.params.provider.EnumSource;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038public abstract class BasicReadWriteWithDifferentConnectionRegistriesTestBase {
039
040  private static final Logger LOG =
041    LoggerFactory.getLogger(TestBasicReadWriteWithDifferentConnectionRegistries.class);
042
043  protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
044
045  public enum RegistryImpl {
046    ZK,
047    RPC,
048    ZK_URI,
049    RPC_URI
050  }
051
052  private TableName tableName;
053
054  private byte[] FAMILY = Bytes.toBytes("family");
055
056  private Connection conn;
057
058  protected abstract Configuration getConf();
059
060  protected abstract Connection createConn(URI uri) throws IOException;
061
062  private void init(RegistryImpl impl) throws Exception {
063    switch (impl) {
064      case ZK: {
065        Configuration conf = getConf();
066        conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
067          ZKConnectionRegistry.class, ConnectionRegistry.class);
068        String quorum = UTIL.getZkCluster().getAddress().toString();
069        String path = UTIL.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT);
070        conf.set(HConstants.CLIENT_ZOOKEEPER_QUORUM, quorum);
071        conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, path);
072        LOG.info("connect to cluster through zk quorum={} and parent={}", quorum, path);
073        conn = ConnectionFactory.createConnection(conf);
074        break;
075      }
076      case RPC: {
077        Configuration conf = getConf();
078        conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
079          RpcConnectionRegistry.class, ConnectionRegistry.class);
080        String bootstrapServers =
081          UTIL.getMiniHBaseCluster().getMaster().getServerName().getAddress().toString();
082        conf.set(RpcConnectionRegistry.BOOTSTRAP_NODES, bootstrapServers);
083        LOG.info("connect to cluster through rpc bootstrap servers={}", bootstrapServers);
084        conn = ConnectionFactory.createConnection(conf);
085        break;
086      }
087      case ZK_URI: {
088        String quorum = UTIL.getZkCluster().getAddress().toString();
089        String path = UTIL.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT);
090        URI connectionUri = new URI("hbase+zk://" + quorum + path);
091        LOG.info("connect to cluster through connection url: {}", connectionUri);
092        conn = createConn(connectionUri);
093        break;
094      }
095      case RPC_URI: {
096        URI connectionUri = new URI("hbase+rpc://"
097          + UTIL.getMiniHBaseCluster().getMaster().getServerName().getAddress().toString());
098        LOG.info("connect to cluster through connection url: {}", connectionUri);
099        conn = createConn(connectionUri);
100        break;
101      }
102      default:
103        throw new IllegalArgumentException("Unknown impl: " + impl);
104    }
105    try (Admin admin = conn.getAdmin()) {
106      admin.createTable(TableDescriptorBuilder.newBuilder(tableName)
107        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build());
108    }
109  }
110
111  @BeforeEach
112  public void setUp(TestInfo testInfo) {
113    String displayName = testInfo.getTestMethod().get().getName() + testInfo.getDisplayName();
114    tableName = TableName.valueOf(displayName.replaceAll("[ \\[\\]]", "_"));
115  }
116
117  @AfterEach
118  public void tearDown() throws Exception {
119    try (Admin admin = conn.getAdmin()) {
120      admin.disableTable(tableName);
121      admin.deleteTable(tableName);
122    }
123    conn.close();
124  }
125
126  @ParameterizedTest
127  @EnumSource(value = RegistryImpl.class)
128  public void testReadWrite(RegistryImpl impl) throws Exception {
129    init(impl);
130    byte[] row = Bytes.toBytes("row");
131    byte[] qualifier = Bytes.toBytes("qualifier");
132    byte[] value = Bytes.toBytes("value");
133    try (Table table = conn.getTable(tableName)) {
134      Put put = new Put(row).addColumn(FAMILY, qualifier, value);
135      table.put(put);
136      Result result = table.get(new Get(row));
137      assertArrayEquals(value, result.getValue(FAMILY, qualifier));
138      table.delete(new Delete(row));
139      assertFalse(table.exists(new Get(row)));
140    }
141  }
142}