001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.util.concurrent.ThreadLocalRandom;
022import java.util.concurrent.atomic.AtomicLong;
023import org.apache.commons.lang3.ArrayUtils;
024import org.apache.hadoop.hbase.HBaseTestingUtil;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.client.RegionInfo;
027import org.apache.hadoop.hbase.client.RegionInfoBuilder;
028import org.apache.hadoop.hbase.procedure2.Procedure;
029import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
030import org.apache.hadoop.hbase.procedure2.util.StringUtils;
031import org.apache.hadoop.hbase.util.AbstractHBaseTool;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
034
035import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
036import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
037
038/**
039 * Tool to test performance of locks and queues in procedure scheduler independently from other
040 * framework components. Inserts table and region operations in the scheduler, then polls them and
041 * exercises their locks Number of tables, regions and operations can be set using cli args.
042 */
043public class MasterProcedureSchedulerPerformanceEvaluation extends AbstractHBaseTool {
044  protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
045
046  // Command line options and defaults.
047  public static final int DEFAULT_NUM_TABLES = 5;
048  public static final Option NUM_TABLES_OPTION = new Option("num_table", true,
049    "Number of tables to use for table operations. Default: " + DEFAULT_NUM_TABLES);
050  public static final int DEFAULT_REGIONS_PER_TABLE = 10;
051  public static final Option REGIONS_PER_TABLE_OPTION = new Option("regions_per_table", true,
052    "Total number of regions per table. Default: " + DEFAULT_REGIONS_PER_TABLE);
053  public static final int DEFAULT_NUM_OPERATIONS = 10000000; // 10M
054  public static final Option NUM_OPERATIONS_OPTION = new Option("num_ops", true,
055    "Total number of operations to schedule. Default: " + DEFAULT_NUM_OPERATIONS);
056  public static final int DEFAULT_NUM_THREADS = 10;
057  public static final Option NUM_THREADS_OPTION = new Option("threads", true,
058    "Number of procedure executor threads. Default: " + DEFAULT_NUM_THREADS);
059  public static final String DEFAULT_OPS_TYPE = "both";
060  public static final Option OPS_TYPE_OPTION = new Option("ops_type", true,
061    "Type of operations to run. Value can be table/region/both. In case of 'both', "
062      + "proportion of table:region ops is 1:regions_per_table. Default: " + DEFAULT_OPS_TYPE);
063
064  private int numTables = DEFAULT_NUM_TABLES;
065  private int regionsPerTable = DEFAULT_REGIONS_PER_TABLE;
066  private int numOps = DEFAULT_NUM_OPERATIONS;
067  private int numThreads = DEFAULT_NUM_THREADS;
068  private String opsType = DEFAULT_OPS_TYPE;
069
070  private MasterProcedureScheduler procedureScheduler;
071  // List of table/region procedures to schedule.
072  ProcedureFactory[] ops;
073
074  // Using factory pattern to build a collection of operations which can be executed in an
075  // abstract manner by worker threads.
076  private interface ProcedureFactory {
077    Procedure newProcedure(long procId);
078  }
079
080  private class RegionProcedure extends TestMasterProcedureScheduler.TestRegionProcedure {
081    RegionProcedure(long procId, RegionInfo hri) {
082      super(procId, hri.getTable(), TableOperationType.REGION_UNASSIGN, hri);
083    }
084
085    @Override
086    public LockState acquireLock(Void env) {
087      return procedureScheduler.waitRegions(this, getTableName(), getRegionInfo())
088        ? LockState.LOCK_EVENT_WAIT
089        : LockState.LOCK_ACQUIRED;
090    }
091
092    @Override
093    public void releaseLock(Void env) {
094      procedureScheduler.wakeRegions(this, getTableName(), getRegionInfo());
095    }
096  }
097
098  private class RegionProcedureFactory implements ProcedureFactory {
099    final RegionInfo hri;
100
101    RegionProcedureFactory(RegionInfo hri) {
102      this.hri = hri;
103    }
104
105    @Override
106    public Procedure newProcedure(long procId) {
107      return new RegionProcedure(procId, hri);
108    }
109  }
110
111  private class TableProcedure extends TestMasterProcedureScheduler.TestTableProcedure {
112
113    TableProcedure(long procId, TableName tableName) {
114      super(procId, tableName, TableOperationType.EDIT);
115    }
116
117    @Override
118    public LockState acquireLock(Void env) {
119      return procedureScheduler.waitTableExclusiveLock(this, getTableName())
120        ? LockState.LOCK_EVENT_WAIT
121        : LockState.LOCK_ACQUIRED;
122    }
123
124    @Override
125    public void releaseLock(Void env) {
126      procedureScheduler.wakeTableExclusiveLock(this, getTableName());
127      procedureScheduler.completionCleanup(this);
128    }
129  }
130
131  private class TableProcedureFactory implements ProcedureFactory {
132    final TableName tableName;
133
134    TableProcedureFactory(TableName tableName) {
135      this.tableName = tableName;
136    }
137
138    @Override
139    public Procedure newProcedure(long procId) {
140      return new TableProcedure(procId, tableName);
141    }
142  }
143
144  private void setupOperations() throws Exception {
145    // Create set of operations based on --ops_type command line argument.
146    final ProcedureFactory[] tableOps = new ProcedureFactory[numTables];
147    for (int i = 0; i < numTables; ++i) {
148      tableOps[i] = new TableProcedureFactory(TableName.valueOf("testTableLock-" + i));
149    }
150
151    final ProcedureFactory[] regionOps = new ProcedureFactory[numTables * regionsPerTable];
152    for (int i = 0; i < numTables; ++i) {
153      for (int j = 0; j < regionsPerTable; ++j) {
154        regionOps[i * regionsPerTable + j] = new RegionProcedureFactory(
155          RegionInfoBuilder.newBuilder(((TableProcedureFactory) tableOps[i]).tableName)
156            .setStartKey(Bytes.toBytes(j)).setEndKey(Bytes.toBytes(j + 1)).build());
157      }
158    }
159
160    if (opsType.equals("table")) {
161      System.out.println("Operations: table only");
162      ops = tableOps;
163    } else if (opsType.equals("region")) {
164      System.out.println("Operations: region only");
165      ops = regionOps;
166    } else if (opsType.equals("both")) {
167      System.out.println("Operations: both (table + region)");
168      ops = (ProcedureFactory[]) ArrayUtils.addAll(tableOps, regionOps);
169    } else {
170      throw new Exception("-ops_type should be one of table/region/both.");
171    }
172  }
173
174  @Override
175  protected void addOptions() {
176    addOption(NUM_TABLES_OPTION);
177    addOption(REGIONS_PER_TABLE_OPTION);
178    addOption(NUM_OPERATIONS_OPTION);
179    addOption(NUM_THREADS_OPTION);
180    addOption(OPS_TYPE_OPTION);
181  }
182
183  @Override
184  protected void processOptions(CommandLine cmd) {
185    numTables = getOptionAsInt(cmd, NUM_TABLES_OPTION.getOpt(), DEFAULT_NUM_TABLES);
186    regionsPerTable =
187      getOptionAsInt(cmd, REGIONS_PER_TABLE_OPTION.getOpt(), DEFAULT_REGIONS_PER_TABLE);
188    numOps = getOptionAsInt(cmd, NUM_OPERATIONS_OPTION.getOpt(), DEFAULT_NUM_OPERATIONS);
189    numThreads = getOptionAsInt(cmd, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS);
190    opsType = cmd.getOptionValue(OPS_TYPE_OPTION.getOpt(), DEFAULT_OPS_TYPE);
191  }
192
193  /*******************
194   * WORKERS
195   *******************/
196
197  private final AtomicLong procIds = new AtomicLong(0);
198  private final AtomicLong yield = new AtomicLong(0);
199  private final AtomicLong completed = new AtomicLong(0);
200
201  private class AddProcsWorker extends Thread {
202    @Override
203    public void run() {
204      long procId = procIds.incrementAndGet();
205      int index;
206      while (procId <= numOps) {
207        index = ThreadLocalRandom.current().nextInt(ops.length);
208        procedureScheduler.addBack(ops[index].newProcedure(procId));
209        procId = procIds.incrementAndGet();
210      }
211    }
212  }
213
214  private class PollAndLockWorker extends Thread {
215    @Override
216    public void run() {
217      while (completed.get() < numOps) {
218        // With lock/unlock being ~100ns, and no other workload, 1000ns wait seams reasonable.
219        TestProcedure proc = (TestProcedure) procedureScheduler.poll(1000);
220        if (proc == null) {
221          yield.incrementAndGet();
222          continue;
223        }
224
225        switch (proc.acquireLock(null)) {
226          case LOCK_ACQUIRED:
227            completed.incrementAndGet();
228            proc.releaseLock(null);
229            break;
230          case LOCK_YIELD_WAIT:
231            break;
232          case LOCK_EVENT_WAIT:
233            break;
234        }
235        if (completed.get() % 100000 == 0) {
236          System.out.println("Completed " + completed.get() + " procedures.");
237        }
238      }
239    }
240  }
241
242  /**
243   * Starts the threads and waits for them to finish.
244   * @return time taken by threads to complete, in milliseconds.
245   */
246  long runThreads(Thread[] threads) throws Exception {
247    final long startTime = EnvironmentEdgeManager.currentTime();
248    for (Thread t : threads) {
249      t.start();
250    }
251    for (Thread t : threads) {
252      t.join();
253    }
254    return EnvironmentEdgeManager.currentTime() - startTime;
255  }
256
257  @Override
258  protected int doWork() throws Exception {
259    procedureScheduler = new MasterProcedureScheduler(pid -> null);
260    procedureScheduler.start();
261    setupOperations();
262
263    final Thread[] threads = new Thread[numThreads];
264    for (int i = 0; i < numThreads; ++i) {
265      threads[i] = new AddProcsWorker();
266    }
267    final long addBackTime = runThreads(threads);
268    System.out.println("Added " + numOps + " procedures to scheduler.");
269
270    for (int i = 0; i < numThreads; ++i) {
271      threads[i] = new PollAndLockWorker();
272    }
273    final long pollTime = runThreads(threads);
274    procedureScheduler.stop();
275
276    final float pollTimeSec = pollTime / 1000.0f;
277    final float addBackTimeSec = addBackTime / 1000.0f;
278    System.out.println("******************************************");
279    System.out.println("Time - addBack     : " + StringUtils.humanTimeDiff(addBackTime));
280    System.out.println("Ops/sec - addBack  : " + StringUtils.humanSize(numOps / addBackTimeSec));
281    System.out.println("Time - poll        : " + StringUtils.humanTimeDiff(pollTime));
282    System.out.println("Ops/sec - poll     : " + StringUtils.humanSize(numOps / pollTimeSec));
283    System.out.println("Num Operations     : " + numOps);
284    System.out.println();
285    System.out.println("Completed          : " + completed.get());
286    System.out.println("Yield              : " + yield.get());
287    System.out.println();
288    System.out.println("Num Tables         : " + numTables);
289    System.out.println("Regions per table  : " + regionsPerTable);
290    System.out.println("Operations type    : " + opsType);
291    System.out.println("Threads            : " + numThreads);
292    System.out.println("******************************************");
293    System.out.println("Raw format for scripts");
294    System.out.println(String.format(
295      "RESULT [%s=%s, %s=%s, %s=%s, %s=%s, %s=%s, "
296        + "num_yield=%s, time_addback_ms=%s, time_poll_ms=%s]",
297      NUM_OPERATIONS_OPTION.getOpt(), numOps, OPS_TYPE_OPTION.getOpt(), opsType,
298      NUM_TABLES_OPTION.getOpt(), numTables, REGIONS_PER_TABLE_OPTION.getOpt(), regionsPerTable,
299      NUM_THREADS_OPTION.getOpt(), numThreads, yield.get(), addBackTime, pollTime));
300    return 0;
301  }
302
303  public static void main(String[] args) throws IOException {
304    MasterProcedureSchedulerPerformanceEvaluation tool =
305      new MasterProcedureSchedulerPerformanceEvaluation();
306    tool.setConf(UTIL.getConfiguration());
307    tool.run(args);
308  }
309}