001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.io.PrintWriter; 022import java.io.StringWriter; 023import java.security.PrivilegedExceptionAction; 024import java.util.HashMap; 025import java.util.Map; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.client.Append; 029import org.apache.hadoop.hbase.client.Delete; 030import org.apache.hadoop.hbase.client.Get; 031import org.apache.hadoop.hbase.client.Increment; 032import org.apache.hadoop.hbase.client.Mutation; 033import org.apache.hadoop.hbase.client.Put; 034import org.apache.hadoop.hbase.client.Result; 035import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.security.User; 038import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 039import org.apache.hadoop.security.UserGroupInformation; 040import org.apache.hadoop.util.StringUtils; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045/** 046 * A MultiThreadUpdater that helps to work with ACL 047 */ 048@InterfaceAudience.Private 049public class MultiThreadedUpdaterWithACL extends MultiThreadedUpdater { 050 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedUpdaterWithACL.class); 051 private final static String COMMA = ","; 052 private User userOwner; 053 /** 054 * Maps user with Table instance. Because the table instance has to be created per user inorder to 055 * work in that user's context 056 */ 057 private Map<String, Table> userVsTable = new HashMap<>(); 058 private Map<String, User> users = new HashMap<>(); 059 private String[] userNames; 060 061 public MultiThreadedUpdaterWithACL(LoadTestDataGenerator dataGen, Configuration conf, 062 TableName tableName, double updatePercent, User userOwner, String userNames) 063 throws IOException { 064 super(dataGen, conf, tableName, updatePercent); 065 this.userOwner = userOwner; 066 this.userNames = userNames.split(COMMA); 067 } 068 069 @Override 070 protected void addUpdaterThreads(int numThreads) throws IOException { 071 for (int i = 0; i < numThreads; ++i) { 072 HBaseUpdaterThread updater = new HBaseUpdaterThreadWithACL(i); 073 updaters.add(updater); 074 } 075 } 076 077 public class HBaseUpdaterThreadWithACL extends HBaseUpdaterThread { 078 private MutateAccessAction mutateAction = new MutateAccessAction(); 079 080 public HBaseUpdaterThreadWithACL(int updaterId) throws IOException { 081 super(updaterId); 082 } 083 084 @Override 085 protected Table createTable() throws IOException { 086 return null; 087 } 088 089 @Override 090 protected void closeHTable() { 091 try { 092 if (table != null) { 093 table.close(); 094 } 095 for (Table table : userVsTable.values()) { 096 try { 097 table.close(); 098 } catch (Exception e) { 099 LOG.error("Error while closing the table " + table.getName(), e); 100 } 101 } 102 } catch (Exception e) { 103 LOG.error("Error while closing the HTable " + table.getName(), e); 104 } 105 } 106 107 @Override 108 protected Result getRow(final Get get, final long rowKeyBase, final byte[] cf) { 109 PrivilegedExceptionAction<Object> action = new PrivilegedExceptionAction<Object>() { 110 111 @Override 112 public Object run() throws Exception { 113 Result res = null; 114 Table localTable = null; 115 try { 116 int mod = ((int) rowKeyBase % userNames.length); 117 if (userVsTable.get(userNames[mod]) == null) { 118 localTable = connection.getTable(tableName); 119 userVsTable.put(userNames[mod], localTable); 120 res = localTable.get(get); 121 } else { 122 localTable = userVsTable.get(userNames[mod]); 123 res = localTable.get(get); 124 } 125 } catch (IOException ie) { 126 LOG.warn("Failed to get the row for key = [" + Bytes.toString(get.getRow()) 127 + "], column family = [" + Bytes.toString(cf) + "]", ie); 128 } 129 return res; 130 } 131 }; 132 133 if (userNames != null && userNames.length > 0) { 134 int mod = ((int) rowKeyBase % userNames.length); 135 User user; 136 UserGroupInformation realUserUgi; 137 try { 138 if (!users.containsKey(userNames[mod])) { 139 if (User.isHBaseSecurityEnabled(conf)) { 140 realUserUgi = KerberosUtils.loginAndReturnUGI(conf, userNames[mod]); 141 } else { 142 realUserUgi = UserGroupInformation.createRemoteUser(userNames[mod]); 143 } 144 user = User.create(realUserUgi); 145 users.put(userNames[mod], user); 146 } else { 147 user = users.get(userNames[mod]); 148 } 149 Result result = (Result) user.runAs(action); 150 return result; 151 } catch (Exception ie) { 152 LOG.warn("Failed to get the row for key = [" + Bytes.toString(get.getRow()) 153 + "], column family = [" + Bytes.toString(cf) + "]", ie); 154 } 155 } 156 // This means that no users were present 157 return null; 158 } 159 160 @Override 161 public void mutate(final Table table, Mutation m, final long keyBase, final byte[] row, 162 final byte[] cf, final byte[] q, final byte[] v) { 163 final long start = EnvironmentEdgeManager.currentTime(); 164 try { 165 m = dataGenerator.beforeMutate(keyBase, m); 166 mutateAction.setMutation(m); 167 mutateAction.setCF(cf); 168 mutateAction.setRow(row); 169 mutateAction.setQualifier(q); 170 mutateAction.setValue(v); 171 mutateAction.setStartTime(start); 172 mutateAction.setKeyBase(keyBase); 173 userOwner.runAs(mutateAction); 174 } catch (IOException e) { 175 recordFailure(m, keyBase, start, e); 176 } catch (InterruptedException e) { 177 failedKeySet.add(keyBase); 178 } 179 } 180 181 class MutateAccessAction implements PrivilegedExceptionAction<Object> { 182 private Table table; 183 private long start; 184 private Mutation m; 185 private long keyBase; 186 private byte[] row; 187 private byte[] cf; 188 private byte[] q; 189 private byte[] v; 190 191 public MutateAccessAction() { 192 193 } 194 195 public void setStartTime(final long start) { 196 this.start = start; 197 } 198 199 public void setMutation(final Mutation m) { 200 this.m = m; 201 } 202 203 public void setRow(final byte[] row) { 204 this.row = row; 205 } 206 207 public void setCF(final byte[] cf) { 208 this.cf = cf; 209 } 210 211 public void setQualifier(final byte[] q) { 212 this.q = q; 213 } 214 215 public void setValue(final byte[] v) { 216 this.v = v; 217 } 218 219 public void setKeyBase(final long keyBase) { 220 this.keyBase = keyBase; 221 } 222 223 @Override 224 public Object run() throws Exception { 225 try { 226 if (table == null) { 227 table = connection.getTable(tableName); 228 } 229 if (m instanceof Increment) { 230 table.increment((Increment) m); 231 } else if (m instanceof Append) { 232 table.append((Append) m); 233 } else if (m instanceof Put) { 234 table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put) m); 235 } else if (m instanceof Delete) { 236 table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete) m); 237 } else { 238 throw new IllegalArgumentException( 239 "unsupported mutation " + m.getClass().getSimpleName()); 240 } 241 totalOpTimeMs.addAndGet(EnvironmentEdgeManager.currentTime() - start); 242 } catch (IOException e) { 243 recordFailure(m, keyBase, start, e); 244 } 245 return null; 246 } 247 } 248 249 private void recordFailure(final Mutation m, final long keyBase, final long start, 250 IOException e) { 251 failedKeySet.add(keyBase); 252 String exceptionInfo; 253 if (e instanceof RetriesExhaustedWithDetailsException) { 254 RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e; 255 exceptionInfo = aggEx.getExhaustiveDescription(); 256 } else { 257 StringWriter stackWriter = new StringWriter(); 258 PrintWriter pw = new PrintWriter(stackWriter); 259 e.printStackTrace(pw); 260 pw.flush(); 261 exceptionInfo = StringUtils.stringifyException(e); 262 } 263 LOG.error("Failed to mutate: " + keyBase + " after " 264 + (EnvironmentEdgeManager.currentTime() - start) + "ms; region information: " 265 + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: " + exceptionInfo); 266 } 267 } 268}