001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.security.PrivilegedExceptionAction;
022import java.util.HashMap;
023import java.util.Map;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.client.Get;
027import org.apache.hadoop.hbase.client.Result;
028import org.apache.hadoop.hbase.client.Table;
029import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
030import org.apache.hadoop.hbase.security.User;
031import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
032import org.apache.hadoop.security.UserGroupInformation;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * A MultiThreadReader that helps to work with ACL
038 */
039public class MultiThreadedReaderWithACL extends MultiThreadedReader {
040  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedReaderWithACL.class);
041
042  private static final String COMMA = ",";
043  /**
044   * Maps user with Table instance. Because the table instance has to be created per user inorder to
045   * work in that user's context
046   */
047  private Map<String, Table> userVsTable = new HashMap<>();
048  private Map<String, User> users = new HashMap<>();
049  private String[] userNames;
050
051  public MultiThreadedReaderWithACL(LoadTestDataGenerator dataGen, Configuration conf,
052    TableName tableName, double verifyPercent, String userNames) throws IOException {
053    super(dataGen, conf, tableName, verifyPercent);
054    this.userNames = userNames.split(COMMA);
055  }
056
057  @Override
058  protected void addReaderThreads(int numThreads) throws IOException {
059    for (int i = 0; i < numThreads; ++i) {
060      HBaseReaderThread reader = new HBaseReaderThreadWithACL(i);
061      readers.add(reader);
062    }
063  }
064
065  public class HBaseReaderThreadWithACL extends HBaseReaderThread {
066
067    public HBaseReaderThreadWithACL(int readerId) throws IOException {
068      super(readerId);
069    }
070
071    @Override
072    protected Table createTable() throws IOException {
073      return null;
074    }
075
076    @Override
077    protected void closeTable() {
078      for (Table table : userVsTable.values()) {
079        try {
080          table.close();
081        } catch (Exception e) {
082          LOG.error("Error while closing the table " + table.getName(), e);
083        }
084      }
085    }
086
087    @Override
088    public void queryKey(final Get get, final boolean verify, final long keyToRead)
089      throws IOException {
090      final String rowKey = Bytes.toString(get.getRow());
091
092      // read the data
093      final long start = System.nanoTime();
094      PrivilegedExceptionAction<Object> action = new PrivilegedExceptionAction<Object>() {
095        @Override
096        public Object run() throws Exception {
097          Table localTable = null;
098          try {
099            Result result = null;
100            int specialPermCellInsertionFactor = Integer.parseInt(dataGenerator.getArgs()[2]);
101            int mod = ((int) keyToRead % userNames.length);
102            if (userVsTable.get(userNames[mod]) == null) {
103              localTable = connection.getTable(tableName);
104              userVsTable.put(userNames[mod], localTable);
105              result = localTable.get(get);
106            } else {
107              localTable = userVsTable.get(userNames[mod]);
108              result = localTable.get(get);
109            }
110            boolean isNullExpected = ((((int) keyToRead % specialPermCellInsertionFactor)) == 0);
111            long end = System.nanoTime();
112            verifyResultsAndUpdateMetrics(verify, get, end - start, result, localTable,
113              isNullExpected);
114          } catch (IOException e) {
115            recordFailure(keyToRead);
116          }
117          return null;
118        }
119      };
120      if (userNames != null && userNames.length > 0) {
121        int mod = ((int) keyToRead % userNames.length);
122        User user;
123        UserGroupInformation realUserUgi;
124        if (!users.containsKey(userNames[mod])) {
125          if (User.isHBaseSecurityEnabled(conf)) {
126            realUserUgi = HBaseKerberosUtils.loginAndReturnUGI(conf, userNames[mod]);
127          } else {
128            realUserUgi = UserGroupInformation.createRemoteUser(userNames[mod]);
129          }
130          user = User.create(realUserUgi);
131          users.put(userNames[mod], user);
132        } else {
133          user = users.get(userNames[mod]);
134        }
135        try {
136          user.runAs(action);
137        } catch (Exception e) {
138          recordFailure(keyToRead);
139        }
140      }
141    }
142
143    private void recordFailure(final long keyToRead) {
144      numReadFailures.addAndGet(1);
145      LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") + ", "
146        + "time from start: " + (EnvironmentEdgeManager.currentTime() - startTimeMs) + " ms");
147    }
148  }
149
150}