001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with this 004 * work for additional information regarding copyright ownership. The ASF 005 * licenses this file to you under the Apache License, Version 2.0 (the 006 * "License"); you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 014 * License for the specific language governing permissions and limitations 015 * under the License. 016 */ 017package org.apache.hadoop.hbase.util; 018 019import java.io.IOException; 020import java.security.PrivilegedExceptionAction; 021import java.util.HashMap; 022import java.util.Map; 023 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.TableName; 026import org.apache.hadoop.hbase.client.Get; 027import org.apache.hadoop.hbase.client.Result; 028import org.apache.hadoop.hbase.client.Table; 029import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 030import org.apache.hadoop.hbase.security.User; 031import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 032import org.apache.hadoop.security.UserGroupInformation; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * A MultiThreadReader that helps to work with ACL 038 */ 039public class MultiThreadedReaderWithACL extends MultiThreadedReader { 040 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedReaderWithACL.class); 041 042 private static final String COMMA = ","; 043 /** 044 * Maps user with Table instance. Because the table instance has to be created 045 * per user inorder to work in that user's context 046 */ 047 private Map<String, Table> userVsTable = new HashMap<>(); 048 private Map<String, User> users = new HashMap<>(); 049 private String[] userNames; 050 051 public MultiThreadedReaderWithACL(LoadTestDataGenerator dataGen, Configuration conf, 052 TableName tableName, double verifyPercent, String userNames) throws IOException { 053 super(dataGen, conf, tableName, verifyPercent); 054 this.userNames = userNames.split(COMMA); 055 } 056 057 @Override 058 protected void addReaderThreads(int numThreads) throws IOException { 059 for (int i = 0; i < numThreads; ++i) { 060 HBaseReaderThread reader = new HBaseReaderThreadWithACL(i); 061 readers.add(reader); 062 } 063 } 064 065 public class HBaseReaderThreadWithACL extends HBaseReaderThread { 066 067 public HBaseReaderThreadWithACL(int readerId) throws IOException { 068 super(readerId); 069 } 070 071 @Override 072 protected Table createTable() throws IOException { 073 return null; 074 } 075 076 @Override 077 protected void closeTable() { 078 for (Table table : userVsTable.values()) { 079 try { 080 table.close(); 081 } catch (Exception e) { 082 LOG.error("Error while closing the table " + table.getName(), e); 083 } 084 } 085 } 086 087 @Override 088 public void queryKey(final Get get, final boolean verify, final long keyToRead) 089 throws IOException { 090 final String rowKey = Bytes.toString(get.getRow()); 091 092 // read the data 093 final long start = System.nanoTime(); 094 PrivilegedExceptionAction<Object> action = new PrivilegedExceptionAction<Object>() { 095 @Override 096 public Object run() throws Exception { 097 Table localTable = null; 098 try { 099 Result result = null; 100 int specialPermCellInsertionFactor = Integer.parseInt(dataGenerator.getArgs()[2]); 101 int mod = ((int) keyToRead % userNames.length); 102 if (userVsTable.get(userNames[mod]) == null) { 103 localTable = connection.getTable(tableName); 104 userVsTable.put(userNames[mod], localTable); 105 result = localTable.get(get); 106 } else { 107 localTable = userVsTable.get(userNames[mod]); 108 result = localTable.get(get); 109 } 110 boolean isNullExpected = ((((int) keyToRead % specialPermCellInsertionFactor)) == 0); 111 long end = System.nanoTime(); 112 verifyResultsAndUpdateMetrics(verify, get, end - start, result, localTable, isNullExpected); 113 } catch (IOException e) { 114 recordFailure(keyToRead); 115 } 116 return null; 117 } 118 }; 119 if (userNames != null && userNames.length > 0) { 120 int mod = ((int) keyToRead % userNames.length); 121 User user; 122 UserGroupInformation realUserUgi; 123 if(!users.containsKey(userNames[mod])) { 124 if(User.isHBaseSecurityEnabled(conf)) { 125 realUserUgi = HBaseKerberosUtils.loginAndReturnUGI(conf, userNames[mod]); 126 } else { 127 realUserUgi = UserGroupInformation.createRemoteUser(userNames[mod]); 128 } 129 user = User.create(realUserUgi); 130 users.put(userNames[mod], user); 131 } else { 132 user = users.get(userNames[mod]); 133 } 134 try { 135 user.runAs(action); 136 } catch (Exception e) { 137 recordFailure(keyToRead); 138 } 139 } 140 } 141 142 private void recordFailure(final long keyToRead) { 143 numReadFailures.addAndGet(1); 144 LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") + ", " 145 + "time from start: " + (System.currentTimeMillis() - startTimeMs) + " ms"); 146 } 147 } 148 149}