001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.Collections;
025import java.util.Optional;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.SynchronousQueue;
028import java.util.concurrent.ThreadPoolExecutor;
029import java.util.concurrent.TimeUnit;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Admin;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
035import org.apache.hadoop.hbase.client.Durability;
036import org.apache.hadoop.hbase.client.Put;
037import org.apache.hadoop.hbase.client.Result;
038import org.apache.hadoop.hbase.client.ResultScanner;
039import org.apache.hadoop.hbase.client.Scan;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.client.TableDescriptor;
042import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
043import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.apache.hadoop.hbase.util.Threads;
046import org.apache.hadoop.hbase.wal.WALEdit;
047import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
048import org.junit.After;
049import org.junit.AfterClass;
050import org.junit.BeforeClass;
051import org.junit.ClassRule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054
055/**
056 * Test that a coprocessor can open a connection and write to another table, inside a hook.
057 */
058@Category({CoprocessorTests.class, MediumTests.class})
059public class TestOpenTableInCoprocessor {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063      HBaseClassTestRule.forClass(TestOpenTableInCoprocessor.class);
064
065  private static final TableName otherTable = TableName.valueOf("otherTable");
066  private static final TableName primaryTable = TableName.valueOf("primary");
067  private static final byte[] family = new byte[] { 'f' };
068
069  private static boolean[] completed = new boolean[1];
070  /**
071   * Custom coprocessor that just copies the write to another table.
072   */
073  public static class SendToOtherTableCoprocessor implements RegionCoprocessor, RegionObserver {
074
075    @Override
076    public Optional<RegionObserver> getRegionObserver() {
077      return Optional.of(this);
078    }
079
080    @Override
081    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
082        final WALEdit edit, final Durability durability) throws IOException {
083      try (Table table = e.getEnvironment().getConnection().
084          getTable(otherTable)) {
085        table.put(put);
086        completed[0] = true;
087      }
088    }
089
090  }
091
092  private static boolean[] completedWithPool = new boolean[1];
093  /**
094   * Coprocessor that creates an HTable with a pool to write to another table
095   */
096  public static class CustomThreadPoolCoprocessor implements RegionCoprocessor, RegionObserver {
097
098    /**
099     * @return a pool that has one thread only at every time. A second action added to the pool (
100     *         running concurrently), will cause an exception.
101     */
102    private ExecutorService getPool() {
103      int maxThreads = 1;
104      long keepAliveTime = 60;
105      ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime,
106        TimeUnit.SECONDS, new SynchronousQueue<>(),
107        new ThreadFactoryBuilder().setNameFormat("hbase-table-pool-%d").setDaemon(true)
108          .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
109      pool.allowCoreThreadTimeOut(true);
110      return pool;
111    }
112
113    @Override
114    public Optional<RegionObserver> getRegionObserver() {
115      return Optional.of(this);
116    }
117
118    @Override
119    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
120        final WALEdit edit, final Durability durability) throws IOException {
121      try (Table table = e.getEnvironment().getConnection().getTable(otherTable, getPool())) {
122        Put p = new Put(new byte[]{'a'});
123        p.addColumn(family, null, new byte[]{'a'});
124        try {
125          table.batch(Collections.singletonList(put), null);
126        } catch (InterruptedException e1) {
127          throw new IOException(e1);
128        }
129        completedWithPool[0] = true;
130      }
131    }
132  }
133
134  private static HBaseTestingUtil UTIL = new HBaseTestingUtil();
135
136  @BeforeClass
137  public static void setupCluster() throws Exception {
138    UTIL.startMiniCluster();
139  }
140
141  @After
142  public void cleanupTestTable() throws Exception {
143    UTIL.getAdmin().disableTable(primaryTable);
144    UTIL.getAdmin().deleteTable(primaryTable);
145
146    UTIL.getAdmin().disableTable(otherTable);
147    UTIL.getAdmin().deleteTable(otherTable);
148
149  }
150
151  @AfterClass
152  public static void teardownCluster() throws Exception {
153    UTIL.shutdownMiniCluster();
154  }
155
156  @Test
157  public void testCoprocessorCanCreateConnectionToRemoteTable() throws Throwable {
158    runCoprocessorConnectionToRemoteTable(SendToOtherTableCoprocessor.class, completed);
159  }
160
161  @Test
162  public void testCoprocessorCanCreateConnectionToRemoteTableWithCustomPool() throws Throwable {
163    runCoprocessorConnectionToRemoteTable(CustomThreadPoolCoprocessor.class, completedWithPool);
164  }
165
166  private void runCoprocessorConnectionToRemoteTable(Class<?> clazz, boolean[] completeCheck)
167      throws Throwable {
168    // Check if given class implements RegionObserver.
169    assert (RegionObserver.class.isAssignableFrom(clazz));
170    // add our coprocessor
171    TableDescriptor primaryDescriptor = TableDescriptorBuilder.newBuilder(primaryTable)
172      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCoprocessor(clazz.getName())
173      .build();
174
175
176    TableDescriptor otherDescriptor = TableDescriptorBuilder.newBuilder(otherTable)
177      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
178
179
180    Admin admin = UTIL.getAdmin();
181    admin.createTable(primaryDescriptor);
182    admin.createTable(otherDescriptor);
183
184    Table table = UTIL.getConnection().getTable(TableName.valueOf("primary"));
185    Put p = new Put(new byte[] { 'a' });
186    p.addColumn(family, null, new byte[]{'a'});
187    table.put(p);
188    table.close();
189
190    Table target = UTIL.getConnection().getTable(otherTable);
191    assertTrue("Didn't complete update to target table!", completeCheck[0]);
192    assertEquals("Didn't find inserted row", 1, getKeyValueCount(target));
193    target.close();
194  }
195
196  /**
197   * Count the number of keyvalue in the table. Scans all possible versions
198   * @param table table to scan
199   * @return number of keyvalues over all rows in the table
200   * @throws IOException
201   */
202  private int getKeyValueCount(Table table) throws IOException {
203    Scan scan = new Scan();
204    scan.readVersions(Integer.MAX_VALUE - 1);
205
206    ResultScanner results = table.getScanner(scan);
207    int count = 0;
208    for (Result res : results) {
209      count += res.listCells().size();
210      System.out.println(count + ") " + res);
211    }
212    results.close();
213
214    return count;
215  }
216}