001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.io.PrintWriter; 022import java.io.StringWriter; 023import java.security.PrivilegedExceptionAction; 024import java.util.HashMap; 025import java.util.Map; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.client.Append; 030import org.apache.hadoop.hbase.client.Delete; 031import org.apache.hadoop.hbase.client.Get; 032import org.apache.hadoop.hbase.client.Increment; 033import org.apache.hadoop.hbase.client.Mutation; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.client.Result; 036import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 039import org.apache.hadoop.hbase.security.User; 040import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 041import org.apache.hadoop.security.UserGroupInformation; 042import org.apache.hadoop.util.StringUtils; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * A MultiThreadUpdater that helps to work with ACL 048 */ 049public class MultiThreadedUpdaterWithACL extends MultiThreadedUpdater { 050 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedUpdaterWithACL.class); 051 private final static String COMMA= ","; 052 private User userOwner; 053 /** 054 * Maps user with Table instance. Because the table instance has to be created 055 * per user inorder to work in that user's context 056 */ 057 private Map<String, Table> userVsTable = new HashMap<>(); 058 private Map<String, User> users = new HashMap<>(); 059 private String[] userNames; 060 061 public MultiThreadedUpdaterWithACL(LoadTestDataGenerator dataGen, Configuration conf, 062 TableName tableName, double updatePercent, User userOwner, String userNames) 063 throws IOException { 064 super(dataGen, conf, tableName, updatePercent); 065 this.userOwner = userOwner; 066 this.userNames = userNames.split(COMMA); 067 } 068 069 @Override 070 protected void addUpdaterThreads(int numThreads) throws IOException { 071 for (int i = 0; i < numThreads; ++i) { 072 HBaseUpdaterThread updater = new HBaseUpdaterThreadWithACL(i); 073 updaters.add(updater); 074 } 075 } 076 077 public class HBaseUpdaterThreadWithACL extends HBaseUpdaterThread { 078 079 private Table table; 080 private MutateAccessAction mutateAction = new MutateAccessAction(); 081 082 public HBaseUpdaterThreadWithACL(int updaterId) throws IOException { 083 super(updaterId); 084 } 085 086 @Override 087 protected Table createTable() throws IOException { 088 return null; 089 } 090 091 @Override 092 protected void closeHTable() { 093 try { 094 if (table != null) { 095 table.close(); 096 } 097 for (Table table : userVsTable.values()) { 098 try { 099 table.close(); 100 } catch (Exception e) { 101 LOG.error("Error while closing the table " + table.getName(), e); 102 } 103 } 104 } catch (Exception e) { 105 LOG.error("Error while closing the HTable "+table.getName(), e); 106 } 107 } 108 109 @Override 110 protected Result getRow(final Get get, final long rowKeyBase, final byte[] cf) { 111 PrivilegedExceptionAction<Object> action = new PrivilegedExceptionAction<Object>() { 112 113 @Override 114 public Object run() throws Exception { 115 Result res = null; 116 Table localTable = null; 117 try { 118 int mod = ((int) rowKeyBase % userNames.length); 119 if (userVsTable.get(userNames[mod]) == null) { 120 localTable = connection.getTable(tableName); 121 userVsTable.put(userNames[mod], localTable); 122 res = localTable.get(get); 123 } else { 124 localTable = userVsTable.get(userNames[mod]); 125 res = localTable.get(get); 126 } 127 } catch (IOException ie) { 128 LOG.warn("Failed to get the row for key = [" + Bytes.toString(get.getRow()) + 129 "], column family = [" + Bytes.toString(cf) + "]", ie); 130 } 131 return res; 132 } 133 }; 134 135 if (userNames != null && userNames.length > 0) { 136 int mod = ((int) rowKeyBase % userNames.length); 137 User user; 138 UserGroupInformation realUserUgi; 139 try { 140 if (!users.containsKey(userNames[mod])) { 141 if (User.isHBaseSecurityEnabled(conf)) { 142 realUserUgi = HBaseKerberosUtils.loginAndReturnUGI(conf, userNames[mod]); 143 } else { 144 realUserUgi = UserGroupInformation.createRemoteUser(userNames[mod]); 145 } 146 user = User.create(realUserUgi); 147 users.put(userNames[mod], user); 148 } else { 149 user = users.get(userNames[mod]); 150 } 151 Result result = (Result) user.runAs(action); 152 return result; 153 } catch (Exception ie) { 154 LOG.warn("Failed to get the row for key = [" + Bytes.toString(get.getRow()) + 155 "], column family = [" + Bytes.toString(cf) + "]", ie); 156 } 157 } 158 // This means that no users were present 159 return null; 160 } 161 162 @Override 163 public void mutate(final Table table, Mutation m, final long keyBase, final byte[] row, 164 final byte[] cf, final byte[] q, final byte[] v) { 165 final long start = System.currentTimeMillis(); 166 try { 167 m = dataGenerator.beforeMutate(keyBase, m); 168 mutateAction.setMutation(m); 169 mutateAction.setCF(cf); 170 mutateAction.setRow(row); 171 mutateAction.setQualifier(q); 172 mutateAction.setValue(v); 173 mutateAction.setStartTime(start); 174 mutateAction.setKeyBase(keyBase); 175 userOwner.runAs(mutateAction); 176 } catch (IOException e) { 177 recordFailure(m, keyBase, start, e); 178 } catch (InterruptedException e) { 179 failedKeySet.add(keyBase); 180 } 181 } 182 183 class MutateAccessAction implements PrivilegedExceptionAction<Object> { 184 private Table table; 185 private long start; 186 private Mutation m; 187 private long keyBase; 188 private byte[] row; 189 private byte[] cf; 190 private byte[] q; 191 private byte[] v; 192 193 public MutateAccessAction() { 194 195 } 196 197 public void setStartTime(final long start) { 198 this.start = start; 199 } 200 201 public void setMutation(final Mutation m) { 202 this.m = m; 203 } 204 205 public void setRow(final byte[] row) { 206 this.row = row; 207 } 208 209 public void setCF(final byte[] cf) { 210 this.cf = cf; 211 } 212 213 public void setQualifier(final byte[] q) { 214 this.q = q; 215 } 216 217 public void setValue(final byte[] v) { 218 this.v = v; 219 } 220 221 public void setKeyBase(final long keyBase) { 222 this.keyBase = keyBase; 223 } 224 225 @Override 226 public Object run() throws Exception { 227 try { 228 if (table == null) { 229 table = connection.getTable(tableName); 230 } 231 if (m instanceof Increment) { 232 table.increment((Increment) m); 233 } else if (m instanceof Append) { 234 table.append((Append) m); 235 } else if (m instanceof Put) { 236 table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put) m); 237 } else if (m instanceof Delete) { 238 table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete) m); 239 } else { 240 throw new IllegalArgumentException("unsupported mutation " 241 + m.getClass().getSimpleName()); 242 } 243 totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); 244 } catch (IOException e) { 245 recordFailure(m, keyBase, start, e); 246 } 247 return null; 248 } 249 } 250 251 private void recordFailure(final Mutation m, final long keyBase, 252 final long start, IOException e) { 253 failedKeySet.add(keyBase); 254 String exceptionInfo; 255 if (e instanceof RetriesExhaustedWithDetailsException) { 256 RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e; 257 exceptionInfo = aggEx.getExhaustiveDescription(); 258 } else { 259 StringWriter stackWriter = new StringWriter(); 260 PrintWriter pw = new PrintWriter(stackWriter); 261 e.printStackTrace(pw); 262 pw.flush(); 263 exceptionInfo = StringUtils.stringifyException(e); 264 } 265 LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start) 266 + "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: " 267 + exceptionInfo); 268 } 269 } 270}