001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.util;
018
019import java.io.IOException;
020import java.security.PrivilegedExceptionAction;
021import java.util.HashMap;
022import java.util.Map;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.client.Get;
027import org.apache.hadoop.hbase.client.Result;
028import org.apache.hadoop.hbase.client.Table;
029import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
030import org.apache.hadoop.hbase.security.User;
031import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
032import org.apache.hadoop.security.UserGroupInformation;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * A MultiThreadReader that helps to work with ACL
038 */
039public class MultiThreadedReaderWithACL extends MultiThreadedReader {
040  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedReaderWithACL.class);
041
042  private static final String COMMA = ",";
043  /**
044   * Maps user with Table instance. Because the table instance has to be created
045   * per user inorder to work in that user's context
046   */
047  private Map<String, Table> userVsTable = new HashMap<>();
048  private Map<String, User> users = new HashMap<>();
049  private String[] userNames;
050
051  public MultiThreadedReaderWithACL(LoadTestDataGenerator dataGen, Configuration conf,
052      TableName tableName, double verifyPercent, String userNames) throws IOException {
053    super(dataGen, conf, tableName, verifyPercent);
054    this.userNames = userNames.split(COMMA);
055  }
056
057  @Override
058  protected void addReaderThreads(int numThreads) throws IOException {
059    for (int i = 0; i < numThreads; ++i) {
060      HBaseReaderThread reader = new HBaseReaderThreadWithACL(i);
061      readers.add(reader);
062    }
063  }
064
065  public class HBaseReaderThreadWithACL extends HBaseReaderThread {
066
067    public HBaseReaderThreadWithACL(int readerId) throws IOException {
068      super(readerId);
069    }
070
071    @Override
072    protected Table createTable() throws IOException {
073      return null;
074    }
075
076    @Override
077    protected void closeTable() {
078      for (Table table : userVsTable.values()) {
079        try {
080          table.close();
081        } catch (Exception e) {
082          LOG.error("Error while closing the table " + table.getName(), e);
083        }
084      }
085    }
086
087    @Override
088    public void queryKey(final Get get, final boolean verify, final long keyToRead)
089        throws IOException {
090      final String rowKey = Bytes.toString(get.getRow());
091
092      // read the data
093      final long start = System.nanoTime();
094      PrivilegedExceptionAction<Object> action = new PrivilegedExceptionAction<Object>() {
095        @Override
096        public Object run() throws Exception {
097          Table localTable = null;
098          try {
099            Result result = null;
100            int specialPermCellInsertionFactor = Integer.parseInt(dataGenerator.getArgs()[2]);
101            int mod = ((int) keyToRead % userNames.length);
102            if (userVsTable.get(userNames[mod]) == null) {
103              localTable = connection.getTable(tableName);
104              userVsTable.put(userNames[mod], localTable);
105              result = localTable.get(get);
106            } else {
107              localTable = userVsTable.get(userNames[mod]);
108              result = localTable.get(get);
109            }
110            boolean isNullExpected = ((((int) keyToRead % specialPermCellInsertionFactor)) == 0);
111            long end = System.nanoTime();
112            verifyResultsAndUpdateMetrics(verify, get, end - start, result, localTable, isNullExpected);
113          } catch (IOException e) {
114            recordFailure(keyToRead);
115          }
116          return null;
117        }
118      };
119      if (userNames != null && userNames.length > 0) {
120        int mod = ((int) keyToRead % userNames.length);
121        User user;
122        UserGroupInformation realUserUgi;
123        if(!users.containsKey(userNames[mod])) {
124          if(User.isHBaseSecurityEnabled(conf)) {
125            realUserUgi = HBaseKerberosUtils.loginAndReturnUGI(conf, userNames[mod]);
126          } else {
127            realUserUgi = UserGroupInformation.createRemoteUser(userNames[mod]);
128          }
129          user = User.create(realUserUgi);
130          users.put(userNames[mod], user);
131        } else {
132          user = users.get(userNames[mod]);
133        }
134        try {
135          user.runAs(action);
136        } catch (Exception e) {
137          recordFailure(keyToRead);
138        }
139      }
140    }
141
142    private void recordFailure(final long keyToRead) {
143      numReadFailures.addAndGet(1);
144      LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") + ", "
145          + "time from start: " + (System.currentTimeMillis() - startTimeMs) + " ms");
146    }
147  }
148
149}