001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertFalse;
021import static org.junit.jupiter.api.Assertions.assertNotNull;
022import static org.junit.jupiter.api.Assertions.assertNull;
023import static org.junit.jupiter.api.Assertions.assertSame;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.util.Optional;
029import java.util.concurrent.CountDownLatch;
030import org.apache.hadoop.hbase.Cell.Type;
031import org.apache.hadoop.hbase.CellBuilderFactory;
032import org.apache.hadoop.hbase.CellBuilderType;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
040import org.apache.hadoop.hbase.coprocessor.ObserverContext;
041import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
043import org.apache.hadoop.hbase.coprocessor.RegionObserver;
044import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
045import org.apache.hadoop.hbase.testclassification.MediumTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.junit.jupiter.api.AfterAll;
048import org.junit.jupiter.api.AfterEach;
049import org.junit.jupiter.api.BeforeAll;
050import org.junit.jupiter.api.BeforeEach;
051import org.junit.jupiter.api.Tag;
052import org.junit.jupiter.api.Test;
053
054/**
055 * Confirm that the function of FlushLifeCycleTracker is OK as we do not use it in our own code.
056 */
057@Tag(CoprocessorTests.TAG)
058@Tag(MediumTests.TAG)
059public class TestFlushLifeCycleTracker {
060
061  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
062
063  private static final TableName NAME =
064    TableName.valueOf(TestFlushLifeCycleTracker.class.getSimpleName());
065
066  private static final byte[] CF = Bytes.toBytes("CF");
067
068  private static final byte[] QUALIFIER = Bytes.toBytes("CQ");
069
070  private HRegion region;
071
072  private static FlushLifeCycleTracker TRACKER;
073
074  private static volatile CountDownLatch ARRIVE;
075
076  private static volatile CountDownLatch BLOCK;
077
078  public static final class FlushObserver implements RegionObserver, RegionCoprocessor {
079
080    @Override
081    public Optional<RegionObserver> getRegionObserver() {
082      return Optional.of(this);
083    }
084
085    @Override
086    public void preFlush(ObserverContext<? extends RegionCoprocessorEnvironment> c,
087      FlushLifeCycleTracker tracker) throws IOException {
088      if (TRACKER != null) {
089        assertSame(tracker, TRACKER);
090      }
091    }
092
093    @Override
094    public InternalScanner preFlush(ObserverContext<? extends RegionCoprocessorEnvironment> c,
095      Store store, InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
096      if (TRACKER != null) {
097        assertSame(tracker, TRACKER);
098      }
099      return scanner;
100    }
101
102    @Override
103    public void postFlush(ObserverContext<? extends RegionCoprocessorEnvironment> c,
104      FlushLifeCycleTracker tracker) throws IOException {
105      if (TRACKER != null) {
106        assertSame(tracker, TRACKER);
107      }
108    }
109
110    @Override
111    public void postFlush(ObserverContext<? extends RegionCoprocessorEnvironment> c, Store store,
112      StoreFile resultFile, FlushLifeCycleTracker tracker) throws IOException {
113      if (TRACKER != null) {
114        assertSame(tracker, TRACKER);
115      }
116      // inject here so we can make a flush request to fail because of we already have a flush
117      // ongoing.
118      CountDownLatch arrive = ARRIVE;
119      if (arrive != null) {
120        arrive.countDown();
121        try {
122          BLOCK.await();
123        } catch (InterruptedException e) {
124          throw new InterruptedIOException();
125        }
126      }
127    }
128  }
129
130  private static final class Tracker implements FlushLifeCycleTracker {
131
132    private String reason;
133
134    private boolean beforeExecutionCalled;
135
136    private boolean afterExecutionCalled;
137
138    private boolean completed = false;
139
140    @Override
141    public synchronized void notExecuted(String reason) {
142      this.reason = reason;
143      completed = true;
144      notifyAll();
145    }
146
147    @Override
148    public void beforeExecution() {
149      this.beforeExecutionCalled = true;
150    }
151
152    @Override
153    public synchronized void afterExecution() {
154      this.afterExecutionCalled = true;
155      completed = true;
156      notifyAll();
157    }
158
159    public synchronized void await() throws InterruptedException {
160      while (!completed) {
161        wait();
162      }
163    }
164  }
165
166  @BeforeAll
167  public static void setUpBeforeClass() throws Exception {
168    UTIL.startMiniCluster(3);
169  }
170
171  @AfterAll
172  public static void tearDownAfterClass() throws Exception {
173    UTIL.shutdownMiniCluster();
174  }
175
176  @BeforeEach
177  public void setUp() throws IOException {
178    UTIL.getAdmin()
179      .createTable(TableDescriptorBuilder.newBuilder(NAME)
180        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF))
181        .setCoprocessor(FlushObserver.class.getName()).build());
182    region = UTIL.getHBaseCluster().getRegions(NAME).get(0);
183  }
184
185  @AfterEach
186  public void tearDown() throws IOException {
187    region = null;
188    TRACKER = null;
189    UTIL.deleteTable(NAME);
190  }
191
192  @Test
193  public void test() throws IOException, InterruptedException {
194    try (Table table = UTIL.getConnection().getTable(NAME)) {
195      for (int i = 0; i < 100; i++) {
196        byte[] row = Bytes.toBytes(i);
197        table.put(
198          new Put(row, true).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row)
199            .setFamily(CF).setQualifier(QUALIFIER).setTimestamp(HConstants.LATEST_TIMESTAMP)
200            .setType(Type.Put).setValue(Bytes.toBytes(i)).build()));
201      }
202    }
203    Tracker tracker = new Tracker();
204    TRACKER = tracker;
205    region.requestFlush(tracker);
206    tracker.await();
207    assertNull(tracker.reason);
208    assertTrue(tracker.beforeExecutionCalled);
209    assertTrue(tracker.afterExecutionCalled);
210
211    // request flush on a region with empty memstore should still success
212    tracker = new Tracker();
213    TRACKER = tracker;
214    region.requestFlush(tracker);
215    tracker.await();
216    assertNull(tracker.reason);
217    assertTrue(tracker.beforeExecutionCalled);
218    assertTrue(tracker.afterExecutionCalled);
219  }
220
221  @Test
222  public void testNotExecuted() throws IOException, InterruptedException {
223    try (Table table = UTIL.getConnection().getTable(NAME)) {
224      for (int i = 0; i < 100; i++) {
225        byte[] row = Bytes.toBytes(i);
226        table.put(
227          new Put(row, true).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row)
228            .setFamily(CF).setQualifier(QUALIFIER).setTimestamp(HConstants.LATEST_TIMESTAMP)
229            .setType(Type.Put).setValue(Bytes.toBytes(i)).build()));
230      }
231    }
232    // here we may have overlap when calling the CP hooks so we do not assert on TRACKER
233    Tracker tracker1 = new Tracker();
234    ARRIVE = new CountDownLatch(1);
235    BLOCK = new CountDownLatch(1);
236    region.requestFlush(tracker1);
237    ARRIVE.await();
238
239    Tracker tracker2 = new Tracker();
240    region.requestFlush(tracker2);
241    tracker2.await();
242    assertNotNull(tracker2.reason);
243    assertFalse(tracker2.beforeExecutionCalled);
244    assertFalse(tracker2.afterExecutionCalled);
245
246    BLOCK.countDown();
247    tracker1.await();
248    assertNull(tracker1.reason);
249    assertTrue(tracker1.beforeExecutionCalled);
250    assertTrue(tracker1.afterExecutionCalled);
251  }
252}