001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.io.PrintWriter;
022import java.io.StringWriter;
023import java.security.PrivilegedExceptionAction;
024import java.util.HashMap;
025import java.util.Map;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.client.Append;
029import org.apache.hadoop.hbase.client.Delete;
030import org.apache.hadoop.hbase.client.Get;
031import org.apache.hadoop.hbase.client.Increment;
032import org.apache.hadoop.hbase.client.Mutation;
033import org.apache.hadoop.hbase.client.Put;
034import org.apache.hadoop.hbase.client.Result;
035import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
036import org.apache.hadoop.hbase.client.Table;
037import org.apache.hadoop.hbase.security.User;
038import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
039import org.apache.hadoop.security.UserGroupInformation;
040import org.apache.hadoop.util.StringUtils;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * A MultiThreadUpdater that helps to work with ACL
047 */
048@InterfaceAudience.Private
049public class MultiThreadedUpdaterWithACL extends MultiThreadedUpdater {
050  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedUpdaterWithACL.class);
051  private final static String COMMA = ",";
052  private User userOwner;
053  /**
054   * Maps user with Table instance. Because the table instance has to be created per user inorder to
055   * work in that user's context
056   */
057  private Map<String, Table> userVsTable = new HashMap<>();
058  private Map<String, User> users = new HashMap<>();
059  private String[] userNames;
060
061  public MultiThreadedUpdaterWithACL(LoadTestDataGenerator dataGen, Configuration conf,
062    TableName tableName, double updatePercent, User userOwner, String userNames)
063    throws IOException {
064    super(dataGen, conf, tableName, updatePercent);
065    this.userOwner = userOwner;
066    this.userNames = userNames.split(COMMA);
067  }
068
069  @Override
070  protected void addUpdaterThreads(int numThreads) throws IOException {
071    for (int i = 0; i < numThreads; ++i) {
072      HBaseUpdaterThread updater = new HBaseUpdaterThreadWithACL(i);
073      updaters.add(updater);
074    }
075  }
076
077  public class HBaseUpdaterThreadWithACL extends HBaseUpdaterThread {
078    private MutateAccessAction mutateAction = new MutateAccessAction();
079
080    public HBaseUpdaterThreadWithACL(int updaterId) throws IOException {
081      super(updaterId);
082    }
083
084    @Override
085    protected Table createTable() throws IOException {
086      return null;
087    }
088
089    @Override
090    protected void closeHTable() {
091      try {
092        if (table != null) {
093          table.close();
094        }
095        for (Table table : userVsTable.values()) {
096          try {
097            table.close();
098          } catch (Exception e) {
099            LOG.error("Error while closing the table " + table.getName(), e);
100          }
101        }
102      } catch (Exception e) {
103        LOG.error("Error while closing the HTable " + table.getName(), e);
104      }
105    }
106
107    @Override
108    protected Result getRow(final Get get, final long rowKeyBase, final byte[] cf) {
109      PrivilegedExceptionAction<Object> action = new PrivilegedExceptionAction<Object>() {
110
111        @Override
112        public Object run() throws Exception {
113          Result res = null;
114          Table localTable = null;
115          try {
116            int mod = ((int) rowKeyBase % userNames.length);
117            if (userVsTable.get(userNames[mod]) == null) {
118              localTable = connection.getTable(tableName);
119              userVsTable.put(userNames[mod], localTable);
120              res = localTable.get(get);
121            } else {
122              localTable = userVsTable.get(userNames[mod]);
123              res = localTable.get(get);
124            }
125          } catch (IOException ie) {
126            LOG.warn("Failed to get the row for key = [" + Bytes.toString(get.getRow())
127              + "], column family = [" + Bytes.toString(cf) + "]", ie);
128          }
129          return res;
130        }
131      };
132
133      if (userNames != null && userNames.length > 0) {
134        int mod = ((int) rowKeyBase % userNames.length);
135        User user;
136        UserGroupInformation realUserUgi;
137        try {
138          if (!users.containsKey(userNames[mod])) {
139            if (User.isHBaseSecurityEnabled(conf)) {
140              realUserUgi = KerberosUtils.loginAndReturnUGI(conf, userNames[mod]);
141            } else {
142              realUserUgi = UserGroupInformation.createRemoteUser(userNames[mod]);
143            }
144            user = User.create(realUserUgi);
145            users.put(userNames[mod], user);
146          } else {
147            user = users.get(userNames[mod]);
148          }
149          Result result = (Result) user.runAs(action);
150          return result;
151        } catch (Exception ie) {
152          LOG.warn("Failed to get the row for key = [" + Bytes.toString(get.getRow())
153            + "], column family = [" + Bytes.toString(cf) + "]", ie);
154        }
155      }
156      // This means that no users were present
157      return null;
158    }
159
160    @Override
161    public void mutate(final Table table, Mutation m, final long keyBase, final byte[] row,
162      final byte[] cf, final byte[] q, final byte[] v) {
163      final long start = EnvironmentEdgeManager.currentTime();
164      try {
165        m = dataGenerator.beforeMutate(keyBase, m);
166        mutateAction.setMutation(m);
167        mutateAction.setCF(cf);
168        mutateAction.setRow(row);
169        mutateAction.setQualifier(q);
170        mutateAction.setValue(v);
171        mutateAction.setStartTime(start);
172        mutateAction.setKeyBase(keyBase);
173        userOwner.runAs(mutateAction);
174      } catch (IOException e) {
175        recordFailure(m, keyBase, start, e);
176      } catch (InterruptedException e) {
177        failedKeySet.add(keyBase);
178      }
179    }
180
181    class MutateAccessAction implements PrivilegedExceptionAction<Object> {
182      private Table table;
183      private long start;
184      private Mutation m;
185      private long keyBase;
186      private byte[] row;
187      private byte[] cf;
188      private byte[] q;
189      private byte[] v;
190
191      public MutateAccessAction() {
192
193      }
194
195      public void setStartTime(final long start) {
196        this.start = start;
197      }
198
199      public void setMutation(final Mutation m) {
200        this.m = m;
201      }
202
203      public void setRow(final byte[] row) {
204        this.row = row;
205      }
206
207      public void setCF(final byte[] cf) {
208        this.cf = cf;
209      }
210
211      public void setQualifier(final byte[] q) {
212        this.q = q;
213      }
214
215      public void setValue(final byte[] v) {
216        this.v = v;
217      }
218
219      public void setKeyBase(final long keyBase) {
220        this.keyBase = keyBase;
221      }
222
223      @Override
224      public Object run() throws Exception {
225        try {
226          if (table == null) {
227            table = connection.getTable(tableName);
228          }
229          if (m instanceof Increment) {
230            table.increment((Increment) m);
231          } else if (m instanceof Append) {
232            table.append((Append) m);
233          } else if (m instanceof Put) {
234            table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put) m);
235          } else if (m instanceof Delete) {
236            table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete) m);
237          } else {
238            throw new IllegalArgumentException(
239              "unsupported mutation " + m.getClass().getSimpleName());
240          }
241          totalOpTimeMs.addAndGet(EnvironmentEdgeManager.currentTime() - start);
242        } catch (IOException e) {
243          recordFailure(m, keyBase, start, e);
244        }
245        return null;
246      }
247    }
248
249    private void recordFailure(final Mutation m, final long keyBase, final long start,
250      IOException e) {
251      failedKeySet.add(keyBase);
252      String exceptionInfo;
253      if (e instanceof RetriesExhaustedWithDetailsException) {
254        RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
255        exceptionInfo = aggEx.getExhaustiveDescription();
256      } else {
257        StringWriter stackWriter = new StringWriter();
258        PrintWriter pw = new PrintWriter(stackWriter);
259        e.printStackTrace(pw);
260        pw.flush();
261        exceptionInfo = StringUtils.stringifyException(e);
262      }
263      LOG.error("Failed to mutate: " + keyBase + " after "
264        + (EnvironmentEdgeManager.currentTime() - start) + "ms; region information: "
265        + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: " + exceptionInfo);
266    }
267  }
268}