001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.io.PrintWriter;
022import java.io.StringWriter;
023import java.security.PrivilegedExceptionAction;
024import java.util.HashMap;
025import java.util.Map;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.client.Append;
029import org.apache.hadoop.hbase.client.Delete;
030import org.apache.hadoop.hbase.client.Get;
031import org.apache.hadoop.hbase.client.Increment;
032import org.apache.hadoop.hbase.client.Mutation;
033import org.apache.hadoop.hbase.client.Put;
034import org.apache.hadoop.hbase.client.Result;
035import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
036import org.apache.hadoop.hbase.client.Table;
037import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
038import org.apache.hadoop.hbase.security.User;
039import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
040import org.apache.hadoop.security.UserGroupInformation;
041import org.apache.hadoop.util.StringUtils;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * A MultiThreadUpdater that helps to work with ACL
047 */
048public class MultiThreadedUpdaterWithACL extends MultiThreadedUpdater {
049  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedUpdaterWithACL.class);
050  private final static String COMMA = ",";
051  private User userOwner;
052  /**
053   * Maps user with Table instance. Because the table instance has to be created per user inorder to
054   * work in that user's context
055   */
056  private Map<String, Table> userVsTable = new HashMap<>();
057  private Map<String, User> users = new HashMap<>();
058  private String[] userNames;
059
060  public MultiThreadedUpdaterWithACL(LoadTestDataGenerator dataGen, Configuration conf,
061    TableName tableName, double updatePercent, User userOwner, String userNames)
062    throws IOException {
063    super(dataGen, conf, tableName, updatePercent);
064    this.userOwner = userOwner;
065    this.userNames = userNames.split(COMMA);
066  }
067
068  @Override
069  protected void addUpdaterThreads(int numThreads) throws IOException {
070    for (int i = 0; i < numThreads; ++i) {
071      HBaseUpdaterThread updater = new HBaseUpdaterThreadWithACL(i);
072      updaters.add(updater);
073    }
074  }
075
076  public class HBaseUpdaterThreadWithACL extends HBaseUpdaterThread {
077
078    private Table table;
079    private MutateAccessAction mutateAction = new MutateAccessAction();
080
081    public HBaseUpdaterThreadWithACL(int updaterId) throws IOException {
082      super(updaterId);
083    }
084
085    @Override
086    protected Table createTable() throws IOException {
087      return null;
088    }
089
090    @Override
091    protected void closeHTable() {
092      try {
093        if (table != null) {
094          table.close();
095        }
096        for (Table table : userVsTable.values()) {
097          try {
098            table.close();
099          } catch (Exception e) {
100            LOG.error("Error while closing the table " + table.getName(), e);
101          }
102        }
103      } catch (Exception e) {
104        LOG.error("Error while closing the HTable " + table.getName(), e);
105      }
106    }
107
108    @Override
109    protected Result getRow(final Get get, final long rowKeyBase, final byte[] cf) {
110      PrivilegedExceptionAction<Object> action = new PrivilegedExceptionAction<Object>() {
111
112        @Override
113        public Object run() throws Exception {
114          Result res = null;
115          Table localTable = null;
116          try {
117            int mod = ((int) rowKeyBase % userNames.length);
118            if (userVsTable.get(userNames[mod]) == null) {
119              localTable = connection.getTable(tableName);
120              userVsTable.put(userNames[mod], localTable);
121              res = localTable.get(get);
122            } else {
123              localTable = userVsTable.get(userNames[mod]);
124              res = localTable.get(get);
125            }
126          } catch (IOException ie) {
127            LOG.warn("Failed to get the row for key = [" + Bytes.toString(get.getRow())
128              + "], column family = [" + Bytes.toString(cf) + "]", ie);
129          }
130          return res;
131        }
132      };
133
134      if (userNames != null && userNames.length > 0) {
135        int mod = ((int) rowKeyBase % userNames.length);
136        User user;
137        UserGroupInformation realUserUgi;
138        try {
139          if (!users.containsKey(userNames[mod])) {
140            if (User.isHBaseSecurityEnabled(conf)) {
141              realUserUgi = HBaseKerberosUtils.loginAndReturnUGI(conf, userNames[mod]);
142            } else {
143              realUserUgi = UserGroupInformation.createRemoteUser(userNames[mod]);
144            }
145            user = User.create(realUserUgi);
146            users.put(userNames[mod], user);
147          } else {
148            user = users.get(userNames[mod]);
149          }
150          Result result = (Result) user.runAs(action);
151          return result;
152        } catch (Exception ie) {
153          LOG.warn("Failed to get the row for key = [" + Bytes.toString(get.getRow())
154            + "], column family = [" + Bytes.toString(cf) + "]", ie);
155        }
156      }
157      // This means that no users were present
158      return null;
159    }
160
161    @Override
162    public void mutate(final Table table, Mutation m, final long keyBase, final byte[] row,
163      final byte[] cf, final byte[] q, final byte[] v) {
164      final long start = EnvironmentEdgeManager.currentTime();
165      try {
166        m = dataGenerator.beforeMutate(keyBase, m);
167        mutateAction.setMutation(m);
168        mutateAction.setCF(cf);
169        mutateAction.setRow(row);
170        mutateAction.setQualifier(q);
171        mutateAction.setValue(v);
172        mutateAction.setStartTime(start);
173        mutateAction.setKeyBase(keyBase);
174        userOwner.runAs(mutateAction);
175      } catch (IOException e) {
176        recordFailure(m, keyBase, start, e);
177      } catch (InterruptedException e) {
178        failedKeySet.add(keyBase);
179      }
180    }
181
182    class MutateAccessAction implements PrivilegedExceptionAction<Object> {
183      private Table table;
184      private long start;
185      private Mutation m;
186      private long keyBase;
187      private byte[] row;
188      private byte[] cf;
189      private byte[] q;
190      private byte[] v;
191
192      public MutateAccessAction() {
193
194      }
195
196      public void setStartTime(final long start) {
197        this.start = start;
198      }
199
200      public void setMutation(final Mutation m) {
201        this.m = m;
202      }
203
204      public void setRow(final byte[] row) {
205        this.row = row;
206      }
207
208      public void setCF(final byte[] cf) {
209        this.cf = cf;
210      }
211
212      public void setQualifier(final byte[] q) {
213        this.q = q;
214      }
215
216      public void setValue(final byte[] v) {
217        this.v = v;
218      }
219
220      public void setKeyBase(final long keyBase) {
221        this.keyBase = keyBase;
222      }
223
224      @Override
225      public Object run() throws Exception {
226        try {
227          if (table == null) {
228            table = connection.getTable(tableName);
229          }
230          if (m instanceof Increment) {
231            table.increment((Increment) m);
232          } else if (m instanceof Append) {
233            table.append((Append) m);
234          } else if (m instanceof Put) {
235            table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put) m);
236          } else if (m instanceof Delete) {
237            table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete) m);
238          } else {
239            throw new IllegalArgumentException(
240              "unsupported mutation " + m.getClass().getSimpleName());
241          }
242          totalOpTimeMs.addAndGet(EnvironmentEdgeManager.currentTime() - start);
243        } catch (IOException e) {
244          recordFailure(m, keyBase, start, e);
245        }
246        return null;
247      }
248    }
249
250    private void recordFailure(final Mutation m, final long keyBase, final long start,
251      IOException e) {
252      failedKeySet.add(keyBase);
253      String exceptionInfo;
254      if (e instanceof RetriesExhaustedWithDetailsException) {
255        RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
256        exceptionInfo = aggEx.getExhaustiveDescription();
257      } else {
258        StringWriter stackWriter = new StringWriter();
259        PrintWriter pw = new PrintWriter(stackWriter);
260        e.printStackTrace(pw);
261        pw.flush();
262        exceptionInfo = StringUtils.stringifyException(e);
263      }
264      LOG.error("Failed to mutate: " + keyBase + " after "
265        + (EnvironmentEdgeManager.currentTime() - start) + "ms; region information: "
266        + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: " + exceptionInfo);
267    }
268  }
269}