001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.Collections; 025import java.util.Optional; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.SynchronousQueue; 028import java.util.concurrent.ThreadPoolExecutor; 029import java.util.concurrent.TimeUnit; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.HColumnDescriptor; 033import org.apache.hadoop.hbase.HTableDescriptor; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Admin; 036import org.apache.hadoop.hbase.client.Durability; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.Result; 039import org.apache.hadoop.hbase.client.ResultScanner; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.client.Table; 042import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 043import org.apache.hadoop.hbase.testclassification.MediumTests; 044import org.apache.hadoop.hbase.util.Threads; 045import org.apache.hadoop.hbase.wal.WALEdit; 046import org.junit.After; 047import org.junit.AfterClass; 048import org.junit.BeforeClass; 049import org.junit.ClassRule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052 053/** 054 * Test that a coprocessor can open a connection and write to another table, inside a hook. 055 */ 056@Category({CoprocessorTests.class, MediumTests.class}) 057public class TestOpenTableInCoprocessor { 058 059 @ClassRule 060 public static final HBaseClassTestRule CLASS_RULE = 061 HBaseClassTestRule.forClass(TestOpenTableInCoprocessor.class); 062 063 private static final TableName otherTable = TableName.valueOf("otherTable"); 064 private static final TableName primaryTable = TableName.valueOf("primary"); 065 private static final byte[] family = new byte[] { 'f' }; 066 067 private static boolean[] completed = new boolean[1]; 068 /** 069 * Custom coprocessor that just copies the write to another table. 070 */ 071 public static class SendToOtherTableCoprocessor implements RegionCoprocessor, RegionObserver { 072 073 @Override 074 public Optional<RegionObserver> getRegionObserver() { 075 return Optional.of(this); 076 } 077 078 @Override 079 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put, 080 final WALEdit edit, final Durability durability) throws IOException { 081 try (Table table = e.getEnvironment().getConnection(). 082 getTable(otherTable)) { 083 table.put(put); 084 completed[0] = true; 085 } 086 } 087 088 } 089 090 private static boolean[] completedWithPool = new boolean[1]; 091 /** 092 * Coprocessor that creates an HTable with a pool to write to another table 093 */ 094 public static class CustomThreadPoolCoprocessor implements RegionCoprocessor, RegionObserver { 095 096 /** 097 * @return a pool that has one thread only at every time. A second action added to the pool ( 098 * running concurrently), will cause an exception. 099 */ 100 private ExecutorService getPool() { 101 int maxThreads = 1; 102 long keepAliveTime = 60; 103 ThreadPoolExecutor pool = 104 new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS, 105 new SynchronousQueue<>(), Threads.newDaemonThreadFactory("hbase-table")); 106 pool.allowCoreThreadTimeOut(true); 107 return pool; 108 } 109 110 @Override 111 public Optional<RegionObserver> getRegionObserver() { 112 return Optional.of(this); 113 } 114 115 @Override 116 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put, 117 final WALEdit edit, final Durability durability) throws IOException { 118 try (Table table = e.getEnvironment().getConnection().getTable(otherTable, getPool())) { 119 Put p = new Put(new byte[]{'a'}); 120 p.addColumn(family, null, new byte[]{'a'}); 121 try { 122 table.batch(Collections.singletonList(put), null); 123 } catch (InterruptedException e1) { 124 throw new IOException(e1); 125 } 126 completedWithPool[0] = true; 127 } 128 } 129 } 130 131 private static HBaseTestingUtility UTIL = new HBaseTestingUtility(); 132 133 @BeforeClass 134 public static void setupCluster() throws Exception { 135 UTIL.startMiniCluster(); 136 } 137 138 @After 139 public void cleanupTestTable() throws Exception { 140 UTIL.getAdmin().disableTable(primaryTable); 141 UTIL.getAdmin().deleteTable(primaryTable); 142 143 UTIL.getAdmin().disableTable(otherTable); 144 UTIL.getAdmin().deleteTable(otherTable); 145 146 } 147 148 @AfterClass 149 public static void teardownCluster() throws Exception { 150 UTIL.shutdownMiniCluster(); 151 } 152 153 @Test 154 public void testCoprocessorCanCreateConnectionToRemoteTable() throws Throwable { 155 runCoprocessorConnectionToRemoteTable(SendToOtherTableCoprocessor.class, completed); 156 } 157 158 @Test 159 public void testCoprocessorCanCreateConnectionToRemoteTableWithCustomPool() throws Throwable { 160 runCoprocessorConnectionToRemoteTable(CustomThreadPoolCoprocessor.class, completedWithPool); 161 } 162 163 private void runCoprocessorConnectionToRemoteTable(Class clazz, boolean[] completeCheck) 164 throws Throwable { 165 // Check if given class implements RegionObserver. 166 assert(RegionObserver.class.isAssignableFrom(clazz)); 167 HTableDescriptor primary = new HTableDescriptor(primaryTable); 168 primary.addFamily(new HColumnDescriptor(family)); 169 // add our coprocessor 170 primary.addCoprocessor(clazz.getName()); 171 172 HTableDescriptor other = new HTableDescriptor(otherTable); 173 other.addFamily(new HColumnDescriptor(family)); 174 175 176 Admin admin = UTIL.getAdmin(); 177 admin.createTable(primary); 178 admin.createTable(other); 179 180 Table table = UTIL.getConnection().getTable(TableName.valueOf("primary")); 181 Put p = new Put(new byte[] { 'a' }); 182 p.addColumn(family, null, new byte[]{'a'}); 183 table.put(p); 184 table.close(); 185 186 Table target = UTIL.getConnection().getTable(otherTable); 187 assertTrue("Didn't complete update to target table!", completeCheck[0]); 188 assertEquals("Didn't find inserted row", 1, getKeyValueCount(target)); 189 target.close(); 190 } 191 192 /** 193 * Count the number of keyvalue in the table. Scans all possible versions 194 * @param table table to scan 195 * @return number of keyvalues over all rows in the table 196 * @throws IOException 197 */ 198 private int getKeyValueCount(Table table) throws IOException { 199 Scan scan = new Scan(); 200 scan.setMaxVersions(Integer.MAX_VALUE - 1); 201 202 ResultScanner results = table.getScanner(scan); 203 int count = 0; 204 for (Result res : results) { 205 count += res.listCells().size(); 206 System.out.println(count + ") " + res); 207 } 208 results.close(); 209 210 return count; 211 } 212}