001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.Collections;
025import java.util.Optional;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.SynchronousQueue;
028import java.util.concurrent.ThreadPoolExecutor;
029import java.util.concurrent.TimeUnit;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Admin;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
035import org.apache.hadoop.hbase.client.Durability;
036import org.apache.hadoop.hbase.client.Put;
037import org.apache.hadoop.hbase.client.Result;
038import org.apache.hadoop.hbase.client.ResultScanner;
039import org.apache.hadoop.hbase.client.Scan;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.client.TableDescriptor;
042import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
043import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.apache.hadoop.hbase.util.Threads;
046import org.apache.hadoop.hbase.wal.WALEdit;
047import org.junit.After;
048import org.junit.AfterClass;
049import org.junit.BeforeClass;
050import org.junit.ClassRule;
051import org.junit.Test;
052import org.junit.experimental.categories.Category;
053
054import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
055
056/**
057 * Test that a coprocessor can open a connection and write to another table, inside a hook.
058 */
059@Category({ CoprocessorTests.class, MediumTests.class })
060public class TestOpenTableInCoprocessor {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064    HBaseClassTestRule.forClass(TestOpenTableInCoprocessor.class);
065
066  private static final TableName otherTable = TableName.valueOf("otherTable");
067  private static final TableName primaryTable = TableName.valueOf("primary");
068  private static final byte[] family = new byte[] { 'f' };
069
070  private static boolean[] completed = new boolean[1];
071
072  /**
073   * Custom coprocessor that just copies the write to another table.
074   */
075  public static class SendToOtherTableCoprocessor implements RegionCoprocessor, RegionObserver {
076
077    @Override
078    public Optional<RegionObserver> getRegionObserver() {
079      return Optional.of(this);
080    }
081
082    @Override
083    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
084      final WALEdit edit, final Durability durability) throws IOException {
085      try (Table table = e.getEnvironment().getConnection().getTable(otherTable)) {
086        table.put(put);
087        completed[0] = true;
088      }
089    }
090
091  }
092
093  private static boolean[] completedWithPool = new boolean[1];
094
095  /**
096   * Coprocessor that creates an HTable with a pool to write to another table
097   */
098  public static class CustomThreadPoolCoprocessor implements RegionCoprocessor, RegionObserver {
099
100    /**
101     * @return a pool that has one thread only at every time. A second action added to the pool (
102     *         running concurrently), will cause an exception.
103     */
104    private ExecutorService getPool() {
105      int maxThreads = 1;
106      long keepAliveTime = 60;
107      ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime,
108        TimeUnit.SECONDS, new SynchronousQueue<>(),
109        new ThreadFactoryBuilder().setNameFormat("hbase-table-pool-%d").setDaemon(true)
110          .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
111      pool.allowCoreThreadTimeOut(true);
112      return pool;
113    }
114
115    @Override
116    public Optional<RegionObserver> getRegionObserver() {
117      return Optional.of(this);
118    }
119
120    @Override
121    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
122      final WALEdit edit, final Durability durability) throws IOException {
123      try (Table table = e.getEnvironment().getConnection().getTable(otherTable, getPool())) {
124        Put p = new Put(new byte[] { 'a' });
125        p.addColumn(family, null, new byte[] { 'a' });
126        try {
127          table.batch(Collections.singletonList(put), null);
128        } catch (InterruptedException e1) {
129          throw new IOException(e1);
130        }
131        completedWithPool[0] = true;
132      }
133    }
134  }
135
136  private static HBaseTestingUtil UTIL = new HBaseTestingUtil();
137
138  @BeforeClass
139  public static void setupCluster() throws Exception {
140    UTIL.startMiniCluster();
141  }
142
143  @After
144  public void cleanupTestTable() throws Exception {
145    UTIL.getAdmin().disableTable(primaryTable);
146    UTIL.getAdmin().deleteTable(primaryTable);
147
148    UTIL.getAdmin().disableTable(otherTable);
149    UTIL.getAdmin().deleteTable(otherTable);
150
151  }
152
153  @AfterClass
154  public static void teardownCluster() throws Exception {
155    UTIL.shutdownMiniCluster();
156  }
157
158  @Test
159  public void testCoprocessorCanCreateConnectionToRemoteTable() throws Throwable {
160    runCoprocessorConnectionToRemoteTable(SendToOtherTableCoprocessor.class, completed);
161  }
162
163  @Test
164  public void testCoprocessorCanCreateConnectionToRemoteTableWithCustomPool() throws Throwable {
165    runCoprocessorConnectionToRemoteTable(CustomThreadPoolCoprocessor.class, completedWithPool);
166  }
167
168  private void runCoprocessorConnectionToRemoteTable(Class<?> clazz, boolean[] completeCheck)
169    throws Throwable {
170    // Check if given class implements RegionObserver.
171    assert (RegionObserver.class.isAssignableFrom(clazz));
172    // add our coprocessor
173    TableDescriptor primaryDescriptor = TableDescriptorBuilder.newBuilder(primaryTable)
174      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCoprocessor(clazz.getName())
175      .build();
176
177    TableDescriptor otherDescriptor = TableDescriptorBuilder.newBuilder(otherTable)
178      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
179
180    Admin admin = UTIL.getAdmin();
181    admin.createTable(primaryDescriptor);
182    admin.createTable(otherDescriptor);
183
184    Table table = UTIL.getConnection().getTable(TableName.valueOf("primary"));
185    Put p = new Put(new byte[] { 'a' });
186    p.addColumn(family, null, new byte[] { 'a' });
187    table.put(p);
188    table.close();
189
190    Table target = UTIL.getConnection().getTable(otherTable);
191    assertTrue("Didn't complete update to target table!", completeCheck[0]);
192    assertEquals("Didn't find inserted row", 1, getKeyValueCount(target));
193    target.close();
194  }
195
196  /**
197   * Count the number of keyvalue in the table. Scans all possible versions
198   * @param table table to scan
199   * @return number of keyvalues over all rows in the table
200   */
201  private int getKeyValueCount(Table table) throws IOException {
202    Scan scan = new Scan();
203    scan.readVersions(Integer.MAX_VALUE - 1);
204
205    ResultScanner results = table.getScanner(scan);
206    int count = 0;
207    for (Result res : results) {
208      count += res.listCells().size();
209      System.out.println(count + ") " + res);
210    }
211    results.close();
212
213    return count;
214  }
215}