001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.security.PrivilegedExceptionAction;
022import java.util.HashMap;
023import java.util.Map;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.client.Get;
027import org.apache.hadoop.hbase.client.Result;
028import org.apache.hadoop.hbase.client.Table;
029import org.apache.hadoop.hbase.security.User;
030import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
031import org.apache.hadoop.security.UserGroupInformation;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * A MultiThreadReader that helps to work with ACL
038 */
039@InterfaceAudience.Private
040public class MultiThreadedReaderWithACL extends MultiThreadedReader {
041  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedReaderWithACL.class);
042
043  private static final String COMMA = ",";
044  /**
045   * Maps user with Table instance. Because the table instance has to be created per user inorder to
046   * work in that user's context
047   */
048  private Map<String, Table> userVsTable = new HashMap<>();
049  private Map<String, User> users = new HashMap<>();
050  private String[] userNames;
051
052  public MultiThreadedReaderWithACL(LoadTestDataGenerator dataGen, Configuration conf,
053    TableName tableName, double verifyPercent, String userNames) throws IOException {
054    super(dataGen, conf, tableName, verifyPercent);
055    this.userNames = userNames.split(COMMA);
056  }
057
058  @Override
059  protected void addReaderThreads(int numThreads) throws IOException {
060    for (int i = 0; i < numThreads; ++i) {
061      HBaseReaderThread reader = new HBaseReaderThreadWithACL(i);
062      readers.add(reader);
063    }
064  }
065
066  public class HBaseReaderThreadWithACL extends HBaseReaderThread {
067
068    public HBaseReaderThreadWithACL(int readerId) throws IOException {
069      super(readerId);
070    }
071
072    @Override
073    protected Table createTable() throws IOException {
074      return null;
075    }
076
077    @Override
078    protected void closeTable() {
079      for (Table table : userVsTable.values()) {
080        try {
081          table.close();
082        } catch (Exception e) {
083          LOG.error("Error while closing the table " + table.getName(), e);
084        }
085      }
086    }
087
088    @Override
089    public void queryKey(final Get get, final boolean verify, final long keyToRead)
090      throws IOException {
091      // read the data
092      final long start = System.nanoTime();
093      PrivilegedExceptionAction<Object> action = new PrivilegedExceptionAction<Object>() {
094        @Override
095        public Object run() throws Exception {
096          Table localTable = null;
097          try {
098            Result result = null;
099            int specialPermCellInsertionFactor = Integer.parseInt(dataGenerator.getArgs()[2]);
100            int mod = ((int) keyToRead % userNames.length);
101            if (userVsTable.get(userNames[mod]) == null) {
102              localTable = connection.getTable(tableName);
103              userVsTable.put(userNames[mod], localTable);
104              result = localTable.get(get);
105            } else {
106              localTable = userVsTable.get(userNames[mod]);
107              result = localTable.get(get);
108            }
109            boolean isNullExpected = ((((int) keyToRead % specialPermCellInsertionFactor)) == 0);
110            long end = System.nanoTime();
111            verifyResultsAndUpdateMetrics(verify, get, end - start, result, localTable,
112              isNullExpected);
113          } catch (IOException e) {
114            recordFailure(keyToRead);
115          }
116          return null;
117        }
118      };
119      if (userNames != null && userNames.length > 0) {
120        int mod = ((int) keyToRead % userNames.length);
121        User user;
122        UserGroupInformation realUserUgi;
123        if (!users.containsKey(userNames[mod])) {
124          if (User.isHBaseSecurityEnabled(conf)) {
125            realUserUgi = KerberosUtils.loginAndReturnUGI(conf, userNames[mod]);
126          } else {
127            realUserUgi = UserGroupInformation.createRemoteUser(userNames[mod]);
128          }
129          user = User.create(realUserUgi);
130          users.put(userNames[mod], user);
131        } else {
132          user = users.get(userNames[mod]);
133        }
134        try {
135          user.runAs(action);
136        } catch (Exception e) {
137          recordFailure(keyToRead);
138        }
139      }
140    }
141
142    private void recordFailure(final long keyToRead) {
143      numReadFailures.addAndGet(1);
144      LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") + ", "
145        + "time from start: " + (EnvironmentEdgeManager.currentTime() - startTimeMs) + " ms");
146    }
147  }
148
149}