001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.io.PrintWriter; 022import java.io.StringWriter; 023import java.security.PrivilegedExceptionAction; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.TableName; 026import org.apache.hadoop.hbase.client.Put; 027import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; 028import org.apache.hadoop.hbase.client.Table; 029import org.apache.hadoop.hbase.security.User; 030import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 031import org.apache.hadoop.util.StringUtils; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * MultiThreadedWriter that helps in testing ACL 037 */ 038public class MultiThreadedWriterWithACL extends MultiThreadedWriter { 039 040 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedWriterWithACL.class); 041 private User userOwner; 042 043 public MultiThreadedWriterWithACL(LoadTestDataGenerator dataGen, Configuration conf, 044 TableName tableName, User userOwner) throws IOException { 045 super(dataGen, conf, tableName); 046 this.userOwner = userOwner; 047 } 048 049 @Override 050 public void start(long startKey, long endKey, int numThreads) throws IOException { 051 super.start(startKey, endKey, numThreads); 052 } 053 054 @Override 055 protected void createWriterThreads(int numThreads) throws IOException { 056 for (int i = 0; i < numThreads; ++i) { 057 HBaseWriterThread writer = new HBaseWriterThreadWithACL(i); 058 writers.add(writer); 059 } 060 } 061 062 public class HBaseWriterThreadWithACL extends HBaseWriterThread { 063 064 private Table table; 065 private WriteAccessAction writerAction = new WriteAccessAction(); 066 067 public HBaseWriterThreadWithACL(int writerId) throws IOException { 068 super(writerId); 069 } 070 071 @Override 072 protected Table createTable() throws IOException { 073 return null; 074 } 075 076 @Override 077 protected void closeHTable() { 078 if (table != null) { 079 try { 080 table.close(); 081 } catch (Exception e) { 082 LOG.error("Error in closing the table " + table.getName(), e); 083 } 084 } 085 } 086 087 @Override 088 public void insert(final Table table, Put put, final long keyBase) { 089 final long start = EnvironmentEdgeManager.currentTime(); 090 try { 091 put = (Put) dataGenerator.beforeMutate(keyBase, put); 092 writerAction.setPut(put); 093 writerAction.setKeyBase(keyBase); 094 writerAction.setStartTime(start); 095 userOwner.runAs(writerAction); 096 } catch (IOException e) { 097 recordFailure(table, put, keyBase, start, e); 098 } catch (InterruptedException e) { 099 failedKeySet.add(keyBase); 100 } 101 } 102 103 class WriteAccessAction implements PrivilegedExceptionAction<Object> { 104 private Put put; 105 private long keyBase; 106 private long start; 107 108 public WriteAccessAction() { 109 } 110 111 public void setPut(final Put put) { 112 this.put = put; 113 } 114 115 public void setKeyBase(final long keyBase) { 116 this.keyBase = keyBase; 117 } 118 119 public void setStartTime(final long start) { 120 this.start = start; 121 } 122 123 @Override 124 public Object run() throws Exception { 125 try { 126 if (table == null) { 127 table = connection.getTable(tableName); 128 } 129 table.put(put); 130 } catch (IOException e) { 131 recordFailure(table, put, keyBase, start, e); 132 } 133 return null; 134 } 135 } 136 } 137 138 private void recordFailure(final Table table, final Put put, final long keyBase, final long start, 139 IOException e) { 140 failedKeySet.add(keyBase); 141 String exceptionInfo; 142 if (e instanceof RetriesExhaustedWithDetailsException) { 143 RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e; 144 exceptionInfo = aggEx.getExhaustiveDescription(); 145 } else { 146 StringWriter stackWriter = new StringWriter(); 147 PrintWriter pw = new PrintWriter(stackWriter); 148 e.printStackTrace(pw); 149 pw.flush(); 150 exceptionInfo = StringUtils.stringifyException(e); 151 } 152 LOG.error("Failed to insert: " + keyBase + " after " 153 + (EnvironmentEdgeManager.currentTime() - start) + "ms; region information: " 154 + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: " + exceptionInfo); 155 } 156}