001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.io.PrintWriter; 022import java.io.StringWriter; 023import java.security.PrivilegedExceptionAction; 024import java.util.HashMap; 025import java.util.Map; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.client.Append; 029import org.apache.hadoop.hbase.client.Delete; 030import org.apache.hadoop.hbase.client.Get; 031import org.apache.hadoop.hbase.client.Increment; 032import org.apache.hadoop.hbase.client.Mutation; 033import org.apache.hadoop.hbase.client.Put; 034import org.apache.hadoop.hbase.client.Result; 035import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 038import org.apache.hadoop.hbase.security.User; 039import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 040import org.apache.hadoop.security.UserGroupInformation; 041import org.apache.hadoop.util.StringUtils; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045/** 046 * A MultiThreadUpdater that helps to work with ACL 047 */ 048public class MultiThreadedUpdaterWithACL extends MultiThreadedUpdater { 049 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedUpdaterWithACL.class); 050 private final static String COMMA = ","; 051 private User userOwner; 052 /** 053 * Maps user with Table instance. Because the table instance has to be created per user inorder to 054 * work in that user's context 055 */ 056 private Map<String, Table> userVsTable = new HashMap<>(); 057 private Map<String, User> users = new HashMap<>(); 058 private String[] userNames; 059 060 public MultiThreadedUpdaterWithACL(LoadTestDataGenerator dataGen, Configuration conf, 061 TableName tableName, double updatePercent, User userOwner, String userNames) 062 throws IOException { 063 super(dataGen, conf, tableName, updatePercent); 064 this.userOwner = userOwner; 065 this.userNames = userNames.split(COMMA); 066 } 067 068 @Override 069 protected void addUpdaterThreads(int numThreads) throws IOException { 070 for (int i = 0; i < numThreads; ++i) { 071 HBaseUpdaterThread updater = new HBaseUpdaterThreadWithACL(i); 072 updaters.add(updater); 073 } 074 } 075 076 public class HBaseUpdaterThreadWithACL extends HBaseUpdaterThread { 077 078 private Table table; 079 private MutateAccessAction mutateAction = new MutateAccessAction(); 080 081 public HBaseUpdaterThreadWithACL(int updaterId) throws IOException { 082 super(updaterId); 083 } 084 085 @Override 086 protected Table createTable() throws IOException { 087 return null; 088 } 089 090 @Override 091 protected void closeHTable() { 092 try { 093 if (table != null) { 094 table.close(); 095 } 096 for (Table table : userVsTable.values()) { 097 try { 098 table.close(); 099 } catch (Exception e) { 100 LOG.error("Error while closing the table " + table.getName(), e); 101 } 102 } 103 } catch (Exception e) { 104 LOG.error("Error while closing the HTable " + table.getName(), e); 105 } 106 } 107 108 @Override 109 protected Result getRow(final Get get, final long rowKeyBase, final byte[] cf) { 110 PrivilegedExceptionAction<Object> action = new PrivilegedExceptionAction<Object>() { 111 112 @Override 113 public Object run() throws Exception { 114 Result res = null; 115 Table localTable = null; 116 try { 117 int mod = ((int) rowKeyBase % userNames.length); 118 if (userVsTable.get(userNames[mod]) == null) { 119 localTable = connection.getTable(tableName); 120 userVsTable.put(userNames[mod], localTable); 121 res = localTable.get(get); 122 } else { 123 localTable = userVsTable.get(userNames[mod]); 124 res = localTable.get(get); 125 } 126 } catch (IOException ie) { 127 LOG.warn("Failed to get the row for key = [" + Bytes.toString(get.getRow()) 128 + "], column family = [" + Bytes.toString(cf) + "]", ie); 129 } 130 return res; 131 } 132 }; 133 134 if (userNames != null && userNames.length > 0) { 135 int mod = ((int) rowKeyBase % userNames.length); 136 User user; 137 UserGroupInformation realUserUgi; 138 try { 139 if (!users.containsKey(userNames[mod])) { 140 if (User.isHBaseSecurityEnabled(conf)) { 141 realUserUgi = HBaseKerberosUtils.loginAndReturnUGI(conf, userNames[mod]); 142 } else { 143 realUserUgi = UserGroupInformation.createRemoteUser(userNames[mod]); 144 } 145 user = User.create(realUserUgi); 146 users.put(userNames[mod], user); 147 } else { 148 user = users.get(userNames[mod]); 149 } 150 Result result = (Result) user.runAs(action); 151 return result; 152 } catch (Exception ie) { 153 LOG.warn("Failed to get the row for key = [" + Bytes.toString(get.getRow()) 154 + "], column family = [" + Bytes.toString(cf) + "]", ie); 155 } 156 } 157 // This means that no users were present 158 return null; 159 } 160 161 @Override 162 public void mutate(final Table table, Mutation m, final long keyBase, final byte[] row, 163 final byte[] cf, final byte[] q, final byte[] v) { 164 final long start = EnvironmentEdgeManager.currentTime(); 165 try { 166 m = dataGenerator.beforeMutate(keyBase, m); 167 mutateAction.setMutation(m); 168 mutateAction.setCF(cf); 169 mutateAction.setRow(row); 170 mutateAction.setQualifier(q); 171 mutateAction.setValue(v); 172 mutateAction.setStartTime(start); 173 mutateAction.setKeyBase(keyBase); 174 userOwner.runAs(mutateAction); 175 } catch (IOException e) { 176 recordFailure(m, keyBase, start, e); 177 } catch (InterruptedException e) { 178 failedKeySet.add(keyBase); 179 } 180 } 181 182 class MutateAccessAction implements PrivilegedExceptionAction<Object> { 183 private Table table; 184 private long start; 185 private Mutation m; 186 private long keyBase; 187 private byte[] row; 188 private byte[] cf; 189 private byte[] q; 190 private byte[] v; 191 192 public MutateAccessAction() { 193 194 } 195 196 public void setStartTime(final long start) { 197 this.start = start; 198 } 199 200 public void setMutation(final Mutation m) { 201 this.m = m; 202 } 203 204 public void setRow(final byte[] row) { 205 this.row = row; 206 } 207 208 public void setCF(final byte[] cf) { 209 this.cf = cf; 210 } 211 212 public void setQualifier(final byte[] q) { 213 this.q = q; 214 } 215 216 public void setValue(final byte[] v) { 217 this.v = v; 218 } 219 220 public void setKeyBase(final long keyBase) { 221 this.keyBase = keyBase; 222 } 223 224 @Override 225 public Object run() throws Exception { 226 try { 227 if (table == null) { 228 table = connection.getTable(tableName); 229 } 230 if (m instanceof Increment) { 231 table.increment((Increment) m); 232 } else if (m instanceof Append) { 233 table.append((Append) m); 234 } else if (m instanceof Put) { 235 table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put) m); 236 } else if (m instanceof Delete) { 237 table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete) m); 238 } else { 239 throw new IllegalArgumentException( 240 "unsupported mutation " + m.getClass().getSimpleName()); 241 } 242 totalOpTimeMs.addAndGet(EnvironmentEdgeManager.currentTime() - start); 243 } catch (IOException e) { 244 recordFailure(m, keyBase, start, e); 245 } 246 return null; 247 } 248 } 249 250 private void recordFailure(final Mutation m, final long keyBase, final long start, 251 IOException e) { 252 failedKeySet.add(keyBase); 253 String exceptionInfo; 254 if (e instanceof RetriesExhaustedWithDetailsException) { 255 RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e; 256 exceptionInfo = aggEx.getExhaustiveDescription(); 257 } else { 258 StringWriter stackWriter = new StringWriter(); 259 PrintWriter pw = new PrintWriter(stackWriter); 260 e.printStackTrace(pw); 261 pw.flush(); 262 exceptionInfo = StringUtils.stringifyException(e); 263 } 264 LOG.error("Failed to mutate: " + keyBase + " after " 265 + (EnvironmentEdgeManager.currentTime() - start) + "ms; region information: " 266 + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: " + exceptionInfo); 267 } 268 } 269}