001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.security.PrivilegedExceptionAction; 022import java.util.HashMap; 023import java.util.Map; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.TableName; 026import org.apache.hadoop.hbase.client.Get; 027import org.apache.hadoop.hbase.client.Result; 028import org.apache.hadoop.hbase.client.Table; 029import org.apache.hadoop.hbase.security.User; 030import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 031import org.apache.hadoop.security.UserGroupInformation; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * A MultiThreadReader that helps to work with ACL 038 */ 039@InterfaceAudience.Private 040public class MultiThreadedReaderWithACL extends MultiThreadedReader { 041 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedReaderWithACL.class); 042 043 private static final String COMMA = ","; 044 /** 045 * Maps user with Table instance. Because the table instance has to be created per user inorder to 046 * work in that user's context 047 */ 048 private Map<String, Table> userVsTable = new HashMap<>(); 049 private Map<String, User> users = new HashMap<>(); 050 private String[] userNames; 051 052 public MultiThreadedReaderWithACL(LoadTestDataGenerator dataGen, Configuration conf, 053 TableName tableName, double verifyPercent, String userNames) throws IOException { 054 super(dataGen, conf, tableName, verifyPercent); 055 this.userNames = userNames.split(COMMA); 056 } 057 058 @Override 059 protected void addReaderThreads(int numThreads) throws IOException { 060 for (int i = 0; i < numThreads; ++i) { 061 HBaseReaderThread reader = new HBaseReaderThreadWithACL(i); 062 readers.add(reader); 063 } 064 } 065 066 public class HBaseReaderThreadWithACL extends HBaseReaderThread { 067 068 public HBaseReaderThreadWithACL(int readerId) throws IOException { 069 super(readerId); 070 } 071 072 @Override 073 protected Table createTable() throws IOException { 074 return null; 075 } 076 077 @Override 078 protected void closeTable() { 079 for (Table table : userVsTable.values()) { 080 try { 081 table.close(); 082 } catch (Exception e) { 083 LOG.error("Error while closing the table " + table.getName(), e); 084 } 085 } 086 } 087 088 @Override 089 public void queryKey(final Get get, final boolean verify, final long keyToRead) 090 throws IOException { 091 // read the data 092 final long start = System.nanoTime(); 093 PrivilegedExceptionAction<Object> action = new PrivilegedExceptionAction<Object>() { 094 @Override 095 public Object run() throws Exception { 096 Table localTable = null; 097 try { 098 Result result = null; 099 int specialPermCellInsertionFactor = Integer.parseInt(dataGenerator.getArgs()[2]); 100 int mod = ((int) keyToRead % userNames.length); 101 if (userVsTable.get(userNames[mod]) == null) { 102 localTable = connection.getTable(tableName); 103 userVsTable.put(userNames[mod], localTable); 104 result = localTable.get(get); 105 } else { 106 localTable = userVsTable.get(userNames[mod]); 107 result = localTable.get(get); 108 } 109 boolean isNullExpected = ((((int) keyToRead % specialPermCellInsertionFactor)) == 0); 110 long end = System.nanoTime(); 111 verifyResultsAndUpdateMetrics(verify, get, end - start, result, localTable, 112 isNullExpected); 113 } catch (IOException e) { 114 recordFailure(keyToRead); 115 } 116 return null; 117 } 118 }; 119 if (userNames != null && userNames.length > 0) { 120 int mod = ((int) keyToRead % userNames.length); 121 User user; 122 UserGroupInformation realUserUgi; 123 if (!users.containsKey(userNames[mod])) { 124 if (User.isHBaseSecurityEnabled(conf)) { 125 realUserUgi = KerberosUtils.loginAndReturnUGI(conf, userNames[mod]); 126 } else { 127 realUserUgi = UserGroupInformation.createRemoteUser(userNames[mod]); 128 } 129 user = User.create(realUserUgi); 130 users.put(userNames[mod], user); 131 } else { 132 user = users.get(userNames[mod]); 133 } 134 try { 135 user.runAs(action); 136 } catch (Exception e) { 137 recordFailure(keyToRead); 138 } 139 } 140 } 141 142 private void recordFailure(final long keyToRead) { 143 numReadFailures.addAndGet(1); 144 LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") + ", " 145 + "time from start: " + (EnvironmentEdgeManager.currentTime() - startTimeMs) + " ms"); 146 } 147 } 148 149}