001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.io.IOException; 024import java.util.Collections; 025import java.util.Optional; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.SynchronousQueue; 028import java.util.concurrent.ThreadPoolExecutor; 029import java.util.concurrent.TimeUnit; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.Admin; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 034import org.apache.hadoop.hbase.client.Durability; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.Result; 037import org.apache.hadoop.hbase.client.ResultScanner; 038import org.apache.hadoop.hbase.client.Scan; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.client.TableDescriptor; 041import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 042import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 043import org.apache.hadoop.hbase.testclassification.MediumTests; 044import org.apache.hadoop.hbase.util.Threads; 045import org.apache.hadoop.hbase.wal.WALEdit; 046import org.junit.jupiter.api.AfterAll; 047import org.junit.jupiter.api.AfterEach; 048import org.junit.jupiter.api.BeforeAll; 049import org.junit.jupiter.api.Tag; 050import org.junit.jupiter.api.Test; 051 052import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 053 054/** 055 * Test that a coprocessor can open a connection and write to another table, inside a hook. 056 */ 057@Tag(CoprocessorTests.TAG) 058@Tag(MediumTests.TAG) 059public class TestOpenTableInCoprocessor { 060 061 private static final TableName otherTable = TableName.valueOf("otherTable"); 062 private static final TableName primaryTable = TableName.valueOf("primary"); 063 private static final byte[] family = new byte[] { 'f' }; 064 065 private static boolean[] completed = new boolean[1]; 066 067 /** 068 * Custom coprocessor that just copies the write to another table. 069 */ 070 public static class SendToOtherTableCoprocessor implements RegionCoprocessor, RegionObserver { 071 072 @Override 073 public Optional<RegionObserver> getRegionObserver() { 074 return Optional.of(this); 075 } 076 077 @Override 078 public void prePut(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 079 final Put put, final WALEdit edit, final Durability durability) throws IOException { 080 try (Table table = e.getEnvironment().getConnection().getTable(otherTable)) { 081 table.put(put); 082 completed[0] = true; 083 } 084 } 085 086 } 087 088 private static boolean[] completedWithPool = new boolean[1]; 089 090 /** 091 * Coprocessor that creates an HTable with a pool to write to another table 092 */ 093 public static class CustomThreadPoolCoprocessor implements RegionCoprocessor, RegionObserver { 094 095 /** 096 * @return a pool that has one thread only at every time. A second action added to the pool ( 097 * running concurrently), will cause an exception. 098 */ 099 private ExecutorService getPool() { 100 int maxThreads = 1; 101 long keepAliveTime = 60; 102 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, 103 TimeUnit.SECONDS, new SynchronousQueue<>(), 104 new ThreadFactoryBuilder().setNameFormat("hbase-table-pool-%d").setDaemon(true) 105 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 106 pool.allowCoreThreadTimeOut(true); 107 return pool; 108 } 109 110 @Override 111 public Optional<RegionObserver> getRegionObserver() { 112 return Optional.of(this); 113 } 114 115 @Override 116 public void prePut(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 117 final Put put, final WALEdit edit, final Durability durability) throws IOException { 118 try (Table table = e.getEnvironment().getConnection().getTable(otherTable, getPool())) { 119 Put p = new Put(new byte[] { 'a' }); 120 p.addColumn(family, null, new byte[] { 'a' }); 121 try { 122 table.batch(Collections.singletonList(put), null); 123 } catch (InterruptedException e1) { 124 throw new IOException(e1); 125 } 126 completedWithPool[0] = true; 127 } 128 } 129 } 130 131 private static HBaseTestingUtil UTIL = new HBaseTestingUtil(); 132 133 @BeforeAll 134 public static void setupCluster() throws Exception { 135 UTIL.startMiniCluster(); 136 } 137 138 @AfterEach 139 public void cleanupTestTable() throws Exception { 140 UTIL.getAdmin().disableTable(primaryTable); 141 UTIL.getAdmin().deleteTable(primaryTable); 142 143 UTIL.getAdmin().disableTable(otherTable); 144 UTIL.getAdmin().deleteTable(otherTable); 145 146 } 147 148 @AfterAll 149 public static void teardownCluster() throws Exception { 150 UTIL.shutdownMiniCluster(); 151 } 152 153 @Test 154 public void testCoprocessorCanCreateConnectionToRemoteTable() throws Throwable { 155 runCoprocessorConnectionToRemoteTable(SendToOtherTableCoprocessor.class, completed); 156 } 157 158 @Test 159 public void testCoprocessorCanCreateConnectionToRemoteTableWithCustomPool() throws Throwable { 160 runCoprocessorConnectionToRemoteTable(CustomThreadPoolCoprocessor.class, completedWithPool); 161 } 162 163 private void runCoprocessorConnectionToRemoteTable(Class<?> clazz, boolean[] completeCheck) 164 throws Throwable { 165 // Check if given class implements RegionObserver. 166 assert (RegionObserver.class.isAssignableFrom(clazz)); 167 // add our coprocessor 168 TableDescriptor primaryDescriptor = TableDescriptorBuilder.newBuilder(primaryTable) 169 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCoprocessor(clazz.getName()) 170 .build(); 171 172 TableDescriptor otherDescriptor = TableDescriptorBuilder.newBuilder(otherTable) 173 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build(); 174 175 Admin admin = UTIL.getAdmin(); 176 admin.createTable(primaryDescriptor); 177 admin.createTable(otherDescriptor); 178 179 Table table = UTIL.getConnection().getTable(TableName.valueOf("primary")); 180 Put p = new Put(new byte[] { 'a' }); 181 p.addColumn(family, null, new byte[] { 'a' }); 182 table.put(p); 183 table.close(); 184 185 Table target = UTIL.getConnection().getTable(otherTable); 186 assertTrue(completeCheck[0], "Didn't complete update to target table!"); 187 assertEquals(1, getKeyValueCount(target), "Didn't find inserted row"); 188 target.close(); 189 } 190 191 /** 192 * Count the number of keyvalue in the table. Scans all possible versions 193 * @param table table to scan 194 * @return number of keyvalues over all rows in the table 195 */ 196 private int getKeyValueCount(Table table) throws IOException { 197 Scan scan = new Scan(); 198 scan.readVersions(Integer.MAX_VALUE - 1); 199 200 ResultScanner results = table.getScanner(scan); 201 int count = 0; 202 for (Result res : results) { 203 count += res.listCells().size(); 204 System.out.println(count + ") " + res); 205 } 206 results.close(); 207 208 return count; 209 } 210}