001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.security.PrivilegedExceptionAction; 022import java.util.HashMap; 023import java.util.Map; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.TableName; 026import org.apache.hadoop.hbase.client.Get; 027import org.apache.hadoop.hbase.client.Result; 028import org.apache.hadoop.hbase.client.Table; 029import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 030import org.apache.hadoop.hbase.security.User; 031import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 032import org.apache.hadoop.security.UserGroupInformation; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * A MultiThreadReader that helps to work with ACL 038 */ 039public class MultiThreadedReaderWithACL extends MultiThreadedReader { 040 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedReaderWithACL.class); 041 042 private static final String COMMA = ","; 043 /** 044 * Maps user with Table instance. Because the table instance has to be created per user inorder to 045 * work in that user's context 046 */ 047 private Map<String, Table> userVsTable = new HashMap<>(); 048 private Map<String, User> users = new HashMap<>(); 049 private String[] userNames; 050 051 public MultiThreadedReaderWithACL(LoadTestDataGenerator dataGen, Configuration conf, 052 TableName tableName, double verifyPercent, String userNames) throws IOException { 053 super(dataGen, conf, tableName, verifyPercent); 054 this.userNames = userNames.split(COMMA); 055 } 056 057 @Override 058 protected void addReaderThreads(int numThreads) throws IOException { 059 for (int i = 0; i < numThreads; ++i) { 060 HBaseReaderThread reader = new HBaseReaderThreadWithACL(i); 061 readers.add(reader); 062 } 063 } 064 065 public class HBaseReaderThreadWithACL extends HBaseReaderThread { 066 067 public HBaseReaderThreadWithACL(int readerId) throws IOException { 068 super(readerId); 069 } 070 071 @Override 072 protected Table createTable() throws IOException { 073 return null; 074 } 075 076 @Override 077 protected void closeTable() { 078 for (Table table : userVsTable.values()) { 079 try { 080 table.close(); 081 } catch (Exception e) { 082 LOG.error("Error while closing the table " + table.getName(), e); 083 } 084 } 085 } 086 087 @Override 088 public void queryKey(final Get get, final boolean verify, final long keyToRead) 089 throws IOException { 090 final String rowKey = Bytes.toString(get.getRow()); 091 092 // read the data 093 final long start = System.nanoTime(); 094 PrivilegedExceptionAction<Object> action = new PrivilegedExceptionAction<Object>() { 095 @Override 096 public Object run() throws Exception { 097 Table localTable = null; 098 try { 099 Result result = null; 100 int specialPermCellInsertionFactor = Integer.parseInt(dataGenerator.getArgs()[2]); 101 int mod = ((int) keyToRead % userNames.length); 102 if (userVsTable.get(userNames[mod]) == null) { 103 localTable = connection.getTable(tableName); 104 userVsTable.put(userNames[mod], localTable); 105 result = localTable.get(get); 106 } else { 107 localTable = userVsTable.get(userNames[mod]); 108 result = localTable.get(get); 109 } 110 boolean isNullExpected = ((((int) keyToRead % specialPermCellInsertionFactor)) == 0); 111 long end = System.nanoTime(); 112 verifyResultsAndUpdateMetrics(verify, get, end - start, result, localTable, 113 isNullExpected); 114 } catch (IOException e) { 115 recordFailure(keyToRead); 116 } 117 return null; 118 } 119 }; 120 if (userNames != null && userNames.length > 0) { 121 int mod = ((int) keyToRead % userNames.length); 122 User user; 123 UserGroupInformation realUserUgi; 124 if (!users.containsKey(userNames[mod])) { 125 if (User.isHBaseSecurityEnabled(conf)) { 126 realUserUgi = HBaseKerberosUtils.loginAndReturnUGI(conf, userNames[mod]); 127 } else { 128 realUserUgi = UserGroupInformation.createRemoteUser(userNames[mod]); 129 } 130 user = User.create(realUserUgi); 131 users.put(userNames[mod], user); 132 } else { 133 user = users.get(userNames[mod]); 134 } 135 try { 136 user.runAs(action); 137 } catch (Exception e) { 138 recordFailure(keyToRead); 139 } 140 } 141 } 142 143 private void recordFailure(final long keyToRead) { 144 numReadFailures.addAndGet(1); 145 LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") + ", " 146 + "time from start: " + (EnvironmentEdgeManager.currentTime() - startTimeMs) + " ms"); 147 } 148 } 149 150}