001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.io.PrintWriter;
022import java.io.StringWriter;
023import java.security.PrivilegedExceptionAction;
024import java.util.HashMap;
025import java.util.Map;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.Append;
030import org.apache.hadoop.hbase.client.Delete;
031import org.apache.hadoop.hbase.client.Get;
032import org.apache.hadoop.hbase.client.Increment;
033import org.apache.hadoop.hbase.client.Mutation;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.client.Result;
036import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
039import org.apache.hadoop.hbase.security.User;
040import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
041import org.apache.hadoop.security.UserGroupInformation;
042import org.apache.hadoop.util.StringUtils;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * A MultiThreadUpdater that helps to work with ACL
048 */
049public class MultiThreadedUpdaterWithACL extends MultiThreadedUpdater {
050  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedUpdaterWithACL.class);
051  private final static String COMMA= ",";
052  private User userOwner;
053  /**
054   * Maps user with Table instance. Because the table instance has to be created
055   * per user inorder to work in that user's context
056   */
057  private Map<String, Table> userVsTable = new HashMap<>();
058  private Map<String, User> users = new HashMap<>();
059  private String[] userNames;
060
061  public MultiThreadedUpdaterWithACL(LoadTestDataGenerator dataGen, Configuration conf,
062      TableName tableName, double updatePercent, User userOwner, String userNames)
063          throws IOException {
064    super(dataGen, conf, tableName, updatePercent);
065    this.userOwner = userOwner;
066    this.userNames = userNames.split(COMMA);
067  }
068
069  @Override
070  protected void addUpdaterThreads(int numThreads) throws IOException {
071    for (int i = 0; i < numThreads; ++i) {
072      HBaseUpdaterThread updater = new HBaseUpdaterThreadWithACL(i);
073      updaters.add(updater);
074    }
075  }
076
077  public class HBaseUpdaterThreadWithACL extends HBaseUpdaterThread {
078
079    private Table table;
080    private MutateAccessAction mutateAction = new MutateAccessAction();
081
082    public HBaseUpdaterThreadWithACL(int updaterId) throws IOException {
083      super(updaterId);
084    }
085
086    @Override
087    protected Table createTable() throws IOException {
088      return null;
089    }
090
091    @Override
092    protected void closeHTable() {
093      try {
094        if (table != null) {
095          table.close();
096        }
097        for (Table table : userVsTable.values()) {
098          try {
099            table.close();
100          } catch (Exception e) {
101            LOG.error("Error while closing the table " + table.getName(), e);
102          }
103        }
104      } catch (Exception e) {
105        LOG.error("Error while closing the HTable "+table.getName(), e);
106      }
107    }
108
109    @Override
110    protected Result getRow(final Get get, final long rowKeyBase, final byte[] cf) {
111      PrivilegedExceptionAction<Object> action = new PrivilegedExceptionAction<Object>() {
112
113        @Override
114        public Object run() throws Exception {
115          Result res = null;
116          Table localTable = null;
117          try {
118            int mod = ((int) rowKeyBase % userNames.length);
119            if (userVsTable.get(userNames[mod]) == null) {
120              localTable = connection.getTable(tableName);
121              userVsTable.put(userNames[mod], localTable);
122              res = localTable.get(get);
123            } else {
124              localTable = userVsTable.get(userNames[mod]);
125              res = localTable.get(get);
126            }
127          } catch (IOException ie) {
128            LOG.warn("Failed to get the row for key = [" + Bytes.toString(get.getRow()) +
129                "], column family = [" + Bytes.toString(cf) + "]", ie);
130          }
131          return res;
132        }
133      };
134
135      if (userNames != null && userNames.length > 0) {
136        int mod = ((int) rowKeyBase % userNames.length);
137        User user;
138        UserGroupInformation realUserUgi;
139        try {
140          if (!users.containsKey(userNames[mod])) {
141            if (User.isHBaseSecurityEnabled(conf)) {
142              realUserUgi = HBaseKerberosUtils.loginAndReturnUGI(conf, userNames[mod]);
143            } else {
144              realUserUgi = UserGroupInformation.createRemoteUser(userNames[mod]);
145            }
146            user = User.create(realUserUgi);
147            users.put(userNames[mod], user);
148          } else {
149            user = users.get(userNames[mod]);
150          }
151          Result result = (Result) user.runAs(action);
152          return result;
153        } catch (Exception ie) {
154          LOG.warn("Failed to get the row for key = [" + Bytes.toString(get.getRow()) +
155              "], column family = [" + Bytes.toString(cf) + "]", ie);
156        }
157      }
158      // This means that no users were present
159      return null;
160    }
161
162    @Override
163    public void mutate(final Table table, Mutation m, final long keyBase, final byte[] row,
164        final byte[] cf, final byte[] q, final byte[] v) {
165      final long start = System.currentTimeMillis();
166      try {
167        m = dataGenerator.beforeMutate(keyBase, m);
168        mutateAction.setMutation(m);
169        mutateAction.setCF(cf);
170        mutateAction.setRow(row);
171        mutateAction.setQualifier(q);
172        mutateAction.setValue(v);
173        mutateAction.setStartTime(start);
174        mutateAction.setKeyBase(keyBase);
175        userOwner.runAs(mutateAction);
176      } catch (IOException e) {
177        recordFailure(m, keyBase, start, e);
178      } catch (InterruptedException e) {
179        failedKeySet.add(keyBase);
180      }
181    }
182
183    class MutateAccessAction implements PrivilegedExceptionAction<Object> {
184      private Table table;
185      private long start;
186      private Mutation m;
187      private long keyBase;
188      private byte[] row;
189      private byte[] cf;
190      private byte[] q;
191      private byte[] v;
192
193      public MutateAccessAction() {
194
195      }
196
197      public void setStartTime(final long start) {
198        this.start = start;
199      }
200
201      public void setMutation(final Mutation m) {
202        this.m = m;
203      }
204
205      public void setRow(final byte[] row) {
206        this.row = row;
207      }
208
209      public void setCF(final byte[] cf) {
210        this.cf = cf;
211      }
212
213      public void setQualifier(final byte[] q) {
214        this.q = q;
215      }
216
217      public void setValue(final byte[] v) {
218        this.v = v;
219      }
220
221      public void setKeyBase(final long keyBase) {
222        this.keyBase = keyBase;
223      }
224
225      @Override
226      public Object run() throws Exception {
227        try {
228          if (table == null) {
229            table = connection.getTable(tableName);
230          }
231          if (m instanceof Increment) {
232            table.increment((Increment) m);
233          } else if (m instanceof Append) {
234            table.append((Append) m);
235          } else if (m instanceof Put) {
236            table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put) m);
237          } else if (m instanceof Delete) {
238            table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete) m);
239          } else {
240            throw new IllegalArgumentException("unsupported mutation "
241                + m.getClass().getSimpleName());
242          }
243          totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
244        } catch (IOException e) {
245          recordFailure(m, keyBase, start, e);
246        }
247        return null;
248      }
249    }
250
251    private void recordFailure(final Mutation m, final long keyBase,
252        final long start, IOException e) {
253      failedKeySet.add(keyBase);
254      String exceptionInfo;
255      if (e instanceof RetriesExhaustedWithDetailsException) {
256        RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
257        exceptionInfo = aggEx.getExhaustiveDescription();
258      } else {
259        StringWriter stackWriter = new StringWriter();
260        PrintWriter pw = new PrintWriter(stackWriter);
261        e.printStackTrace(pw);
262        pw.flush();
263        exceptionInfo = StringUtils.stringifyException(e);
264      }
265      LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start)
266          + "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
267          + exceptionInfo);
268    }
269  }
270}