001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.io.IOException;
024import java.util.Collections;
025import java.util.Optional;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.SynchronousQueue;
028import java.util.concurrent.ThreadPoolExecutor;
029import java.util.concurrent.TimeUnit;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Admin;
033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
034import org.apache.hadoop.hbase.client.Durability;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.Result;
037import org.apache.hadoop.hbase.client.ResultScanner;
038import org.apache.hadoop.hbase.client.Scan;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.client.TableDescriptor;
041import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
042import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
043import org.apache.hadoop.hbase.testclassification.MediumTests;
044import org.apache.hadoop.hbase.util.Threads;
045import org.apache.hadoop.hbase.wal.WALEdit;
046import org.junit.jupiter.api.AfterAll;
047import org.junit.jupiter.api.AfterEach;
048import org.junit.jupiter.api.BeforeAll;
049import org.junit.jupiter.api.Tag;
050import org.junit.jupiter.api.Test;
051
052import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
053
054/**
055 * Test that a coprocessor can open a connection and write to another table, inside a hook.
056 */
057@Tag(CoprocessorTests.TAG)
058@Tag(MediumTests.TAG)
059public class TestOpenTableInCoprocessor {
060
061  private static final TableName otherTable = TableName.valueOf("otherTable");
062  private static final TableName primaryTable = TableName.valueOf("primary");
063  private static final byte[] family = new byte[] { 'f' };
064
065  private static boolean[] completed = new boolean[1];
066
067  /**
068   * Custom coprocessor that just copies the write to another table.
069   */
070  public static class SendToOtherTableCoprocessor implements RegionCoprocessor, RegionObserver {
071
072    @Override
073    public Optional<RegionObserver> getRegionObserver() {
074      return Optional.of(this);
075    }
076
077    @Override
078    public void prePut(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
079      final Put put, final WALEdit edit, final Durability durability) throws IOException {
080      try (Table table = e.getEnvironment().getConnection().getTable(otherTable)) {
081        table.put(put);
082        completed[0] = true;
083      }
084    }
085
086  }
087
088  private static boolean[] completedWithPool = new boolean[1];
089
090  /**
091   * Coprocessor that creates an HTable with a pool to write to another table
092   */
093  public static class CustomThreadPoolCoprocessor implements RegionCoprocessor, RegionObserver {
094
095    /**
096     * @return a pool that has one thread only at every time. A second action added to the pool (
097     *         running concurrently), will cause an exception.
098     */
099    private ExecutorService getPool() {
100      int maxThreads = 1;
101      long keepAliveTime = 60;
102      ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime,
103        TimeUnit.SECONDS, new SynchronousQueue<>(),
104        new ThreadFactoryBuilder().setNameFormat("hbase-table-pool-%d").setDaemon(true)
105          .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
106      pool.allowCoreThreadTimeOut(true);
107      return pool;
108    }
109
110    @Override
111    public Optional<RegionObserver> getRegionObserver() {
112      return Optional.of(this);
113    }
114
115    @Override
116    public void prePut(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
117      final Put put, final WALEdit edit, final Durability durability) throws IOException {
118      try (Table table = e.getEnvironment().getConnection().getTable(otherTable, getPool())) {
119        Put p = new Put(new byte[] { 'a' });
120        p.addColumn(family, null, new byte[] { 'a' });
121        try {
122          table.batch(Collections.singletonList(put), null);
123        } catch (InterruptedException e1) {
124          throw new IOException(e1);
125        }
126        completedWithPool[0] = true;
127      }
128    }
129  }
130
131  private static HBaseTestingUtil UTIL = new HBaseTestingUtil();
132
133  @BeforeAll
134  public static void setupCluster() throws Exception {
135    UTIL.startMiniCluster();
136  }
137
138  @AfterEach
139  public void cleanupTestTable() throws Exception {
140    UTIL.getAdmin().disableTable(primaryTable);
141    UTIL.getAdmin().deleteTable(primaryTable);
142
143    UTIL.getAdmin().disableTable(otherTable);
144    UTIL.getAdmin().deleteTable(otherTable);
145
146  }
147
148  @AfterAll
149  public static void teardownCluster() throws Exception {
150    UTIL.shutdownMiniCluster();
151  }
152
153  @Test
154  public void testCoprocessorCanCreateConnectionToRemoteTable() throws Throwable {
155    runCoprocessorConnectionToRemoteTable(SendToOtherTableCoprocessor.class, completed);
156  }
157
158  @Test
159  public void testCoprocessorCanCreateConnectionToRemoteTableWithCustomPool() throws Throwable {
160    runCoprocessorConnectionToRemoteTable(CustomThreadPoolCoprocessor.class, completedWithPool);
161  }
162
163  private void runCoprocessorConnectionToRemoteTable(Class<?> clazz, boolean[] completeCheck)
164    throws Throwable {
165    // Check if given class implements RegionObserver.
166    assert (RegionObserver.class.isAssignableFrom(clazz));
167    // add our coprocessor
168    TableDescriptor primaryDescriptor = TableDescriptorBuilder.newBuilder(primaryTable)
169      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCoprocessor(clazz.getName())
170      .build();
171
172    TableDescriptor otherDescriptor = TableDescriptorBuilder.newBuilder(otherTable)
173      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
174
175    Admin admin = UTIL.getAdmin();
176    admin.createTable(primaryDescriptor);
177    admin.createTable(otherDescriptor);
178
179    Table table = UTIL.getConnection().getTable(TableName.valueOf("primary"));
180    Put p = new Put(new byte[] { 'a' });
181    p.addColumn(family, null, new byte[] { 'a' });
182    table.put(p);
183    table.close();
184
185    Table target = UTIL.getConnection().getTable(otherTable);
186    assertTrue(completeCheck[0], "Didn't complete update to target table!");
187    assertEquals(1, getKeyValueCount(target), "Didn't find inserted row");
188    target.close();
189  }
190
191  /**
192   * Count the number of keyvalue in the table. Scans all possible versions
193   * @param table table to scan
194   * @return number of keyvalues over all rows in the table
195   */
196  private int getKeyValueCount(Table table) throws IOException {
197    Scan scan = new Scan();
198    scan.readVersions(Integer.MAX_VALUE - 1);
199
200    ResultScanner results = table.getScanner(scan);
201    int count = 0;
202    for (Result res : results) {
203      count += res.listCells().size();
204      System.out.println(count + ") " + res);
205    }
206    results.close();
207
208    return count;
209  }
210}