001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.Collections; 025import java.util.Optional; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.SynchronousQueue; 028import java.util.concurrent.ThreadPoolExecutor; 029import java.util.concurrent.TimeUnit; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.HColumnDescriptor; 033import org.apache.hadoop.hbase.HTableDescriptor; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Admin; 036import org.apache.hadoop.hbase.client.Durability; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.Result; 039import org.apache.hadoop.hbase.client.ResultScanner; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.client.Table; 042import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 043import org.apache.hadoop.hbase.testclassification.MediumTests; 044import org.apache.hadoop.hbase.util.Threads; 045import org.apache.hadoop.hbase.wal.WALEdit; 046import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 047import org.junit.After; 048import org.junit.AfterClass; 049import org.junit.BeforeClass; 050import org.junit.ClassRule; 051import org.junit.Test; 052import org.junit.experimental.categories.Category; 053 054/** 055 * Test that a coprocessor can open a connection and write to another table, inside a hook. 056 */ 057@Category({CoprocessorTests.class, MediumTests.class}) 058public class TestOpenTableInCoprocessor { 059 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestOpenTableInCoprocessor.class); 063 064 private static final TableName otherTable = TableName.valueOf("otherTable"); 065 private static final TableName primaryTable = TableName.valueOf("primary"); 066 private static final byte[] family = new byte[] { 'f' }; 067 068 private static boolean[] completed = new boolean[1]; 069 /** 070 * Custom coprocessor that just copies the write to another table. 071 */ 072 public static class SendToOtherTableCoprocessor implements RegionCoprocessor, RegionObserver { 073 074 @Override 075 public Optional<RegionObserver> getRegionObserver() { 076 return Optional.of(this); 077 } 078 079 @Override 080 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put, 081 final WALEdit edit, final Durability durability) throws IOException { 082 try (Table table = e.getEnvironment().getConnection(). 083 getTable(otherTable)) { 084 table.put(put); 085 completed[0] = true; 086 } 087 } 088 089 } 090 091 private static boolean[] completedWithPool = new boolean[1]; 092 /** 093 * Coprocessor that creates an HTable with a pool to write to another table 094 */ 095 public static class CustomThreadPoolCoprocessor implements RegionCoprocessor, RegionObserver { 096 097 /** 098 * @return a pool that has one thread only at every time. A second action added to the pool ( 099 * running concurrently), will cause an exception. 100 */ 101 private ExecutorService getPool() { 102 int maxThreads = 1; 103 long keepAliveTime = 60; 104 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, 105 TimeUnit.SECONDS, new SynchronousQueue<>(), 106 new ThreadFactoryBuilder().setNameFormat("hbase-table-pool-%d").setDaemon(true) 107 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 108 pool.allowCoreThreadTimeOut(true); 109 return pool; 110 } 111 112 @Override 113 public Optional<RegionObserver> getRegionObserver() { 114 return Optional.of(this); 115 } 116 117 @Override 118 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put, 119 final WALEdit edit, final Durability durability) throws IOException { 120 try (Table table = e.getEnvironment().getConnection().getTable(otherTable, getPool())) { 121 Put p = new Put(new byte[]{'a'}); 122 p.addColumn(family, null, new byte[]{'a'}); 123 try { 124 table.batch(Collections.singletonList(put), null); 125 } catch (InterruptedException e1) { 126 throw new IOException(e1); 127 } 128 completedWithPool[0] = true; 129 } 130 } 131 } 132 133 private static HBaseTestingUtility UTIL = new HBaseTestingUtility(); 134 135 @BeforeClass 136 public static void setupCluster() throws Exception { 137 UTIL.startMiniCluster(); 138 } 139 140 @After 141 public void cleanupTestTable() throws Exception { 142 UTIL.getAdmin().disableTable(primaryTable); 143 UTIL.getAdmin().deleteTable(primaryTable); 144 145 UTIL.getAdmin().disableTable(otherTable); 146 UTIL.getAdmin().deleteTable(otherTable); 147 148 } 149 150 @AfterClass 151 public static void teardownCluster() throws Exception { 152 UTIL.shutdownMiniCluster(); 153 } 154 155 @Test 156 public void testCoprocessorCanCreateConnectionToRemoteTable() throws Throwable { 157 runCoprocessorConnectionToRemoteTable(SendToOtherTableCoprocessor.class, completed); 158 } 159 160 @Test 161 public void testCoprocessorCanCreateConnectionToRemoteTableWithCustomPool() throws Throwable { 162 runCoprocessorConnectionToRemoteTable(CustomThreadPoolCoprocessor.class, completedWithPool); 163 } 164 165 private void runCoprocessorConnectionToRemoteTable(Class clazz, boolean[] completeCheck) 166 throws Throwable { 167 // Check if given class implements RegionObserver. 168 assert(RegionObserver.class.isAssignableFrom(clazz)); 169 HTableDescriptor primary = new HTableDescriptor(primaryTable); 170 primary.addFamily(new HColumnDescriptor(family)); 171 // add our coprocessor 172 primary.addCoprocessor(clazz.getName()); 173 174 HTableDescriptor other = new HTableDescriptor(otherTable); 175 other.addFamily(new HColumnDescriptor(family)); 176 177 178 Admin admin = UTIL.getAdmin(); 179 admin.createTable(primary); 180 admin.createTable(other); 181 182 Table table = UTIL.getConnection().getTable(TableName.valueOf("primary")); 183 Put p = new Put(new byte[] { 'a' }); 184 p.addColumn(family, null, new byte[]{'a'}); 185 table.put(p); 186 table.close(); 187 188 Table target = UTIL.getConnection().getTable(otherTable); 189 assertTrue("Didn't complete update to target table!", completeCheck[0]); 190 assertEquals("Didn't find inserted row", 1, getKeyValueCount(target)); 191 target.close(); 192 } 193 194 /** 195 * Count the number of keyvalue in the table. Scans all possible versions 196 * @param table table to scan 197 * @return number of keyvalues over all rows in the table 198 * @throws IOException 199 */ 200 private int getKeyValueCount(Table table) throws IOException { 201 Scan scan = new Scan(); 202 scan.setMaxVersions(Integer.MAX_VALUE - 1); 203 204 ResultScanner results = table.getScanner(scan); 205 int count = 0; 206 for (Result res : results) { 207 count += res.listCells().size(); 208 System.out.println(count + ") " + res); 209 } 210 results.close(); 211 212 return count; 213 } 214}