001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.region;
019
020import static org.hamcrest.CoreMatchers.startsWith;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertFalse;
024import static org.junit.jupiter.api.Assertions.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.SortedSet;
030import java.util.TreeSet;
031import org.apache.commons.lang3.mutable.MutableLong;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
036import org.apache.hadoop.hbase.HBaseIOException;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.RegionInfoBuilder;
039import org.apache.hadoop.hbase.master.MasterServices;
040import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
041import org.apache.hadoop.hbase.master.region.MasterRegion;
042import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
043import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
044import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
045import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
046import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
047import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
048import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
049import org.apache.hadoop.hbase.testclassification.MasterTests;
050import org.apache.hadoop.hbase.testclassification.SmallTests;
051import org.apache.hadoop.hbase.util.CommonFSUtils;
052import org.junit.jupiter.api.AfterEach;
053import org.junit.jupiter.api.BeforeEach;
054import org.junit.jupiter.api.Tag;
055import org.junit.jupiter.api.Test;
056
057@SuppressWarnings("deprecation")
058@Tag(SmallTests.TAG)
059@Tag(MasterTests.TAG)
060public class TestRegionProcedureStoreMigration {
061
062  private HBaseCommonTestingUtil htu;
063
064  private MasterServices server;
065
066  private MasterRegion region;
067
068  private RegionProcedureStore store;
069
070  private WALProcedureStore walStore;
071
072  @BeforeEach
073  public void setUp() throws IOException {
074    htu = new HBaseCommonTestingUtil();
075    Configuration conf = htu.getConfiguration();
076    conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
077    // Runs on local filesystem. Test does not need sync. Turn off checks.
078    conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
079    Path testDir = htu.getDataTestDir();
080    CommonFSUtils.setRootDir(conf, testDir);
081    walStore = new WALProcedureStore(conf, new LeaseRecovery() {
082
083      @Override
084      public void recoverFileLease(FileSystem fs, Path path) throws IOException {
085      }
086    });
087    walStore.start(1);
088    walStore.recoverLease();
089    walStore.load(new LoadCounter());
090    server = RegionProcedureStoreTestHelper.mockServer(conf);
091    region = MasterRegionFactory.create(server);
092  }
093
094  @AfterEach
095  public void tearDown() throws IOException {
096    if (store != null) {
097      store.stop(true);
098    }
099    region.close(true);
100    walStore.stop(true);
101    htu.cleanupTestDir();
102  }
103
104  @Test
105  public void test() throws IOException {
106    List<RegionProcedureStoreTestProcedure> procs = new ArrayList<>();
107    for (int i = 0; i < 10; i++) {
108      RegionProcedureStoreTestProcedure proc = new RegionProcedureStoreTestProcedure();
109      walStore.insert(proc, null);
110      procs.add(proc);
111    }
112    for (int i = 5; i < 10; i++) {
113      walStore.delete(procs.get(i).getProcId());
114    }
115    walStore.stop(true);
116    SortedSet<RegionProcedureStoreTestProcedure> loadedProcs =
117      new TreeSet<>((p1, p2) -> Long.compare(p1.getProcId(), p2.getProcId()));
118    MutableLong maxProcIdSet = new MutableLong(0);
119    store = RegionProcedureStoreTestHelper.createStore(server, region, new ProcedureLoader() {
120
121      @Override
122      public void setMaxProcId(long maxProcId) {
123        maxProcIdSet.setValue(maxProcId);
124      }
125
126      @Override
127      public void load(ProcedureIterator procIter) throws IOException {
128        while (procIter.hasNext()) {
129          RegionProcedureStoreTestProcedure proc =
130            (RegionProcedureStoreTestProcedure) procIter.next();
131          loadedProcs.add(proc);
132        }
133      }
134
135      @Override
136      public void handleCorrupted(ProcedureIterator procIter) throws IOException {
137        if (procIter.hasNext()) {
138          fail("Found corrupted procedures");
139        }
140      }
141    });
142    assertEquals(10, maxProcIdSet.longValue());
143    assertEquals(5, loadedProcs.size());
144    int procId = 1;
145    for (RegionProcedureStoreTestProcedure proc : loadedProcs) {
146      assertEquals(procId, proc.getProcId());
147      procId++;
148    }
149    Path testDir = htu.getDataTestDir();
150    FileSystem fs = testDir.getFileSystem(htu.getConfiguration());
151    Path oldProcWALDir = new Path(testDir, WALProcedureStore.MASTER_PROCEDURE_LOGDIR);
152    // make sure the old proc wal directory has been deleted.
153    assertFalse(fs.exists(oldProcWALDir));
154  }
155
156  @Test
157  public void testMigrateWithUnsupportedProcedures() throws IOException {
158    AssignProcedure assignProc = new AssignProcedure();
159    assignProc.setProcId(1L);
160    assignProc.setRegionInfo(RegionInfoBuilder.newBuilder(TableName.valueOf("table")).build());
161    walStore.insert(assignProc, null);
162    walStore.stop(true);
163
164    try {
165      store = RegionProcedureStoreTestHelper.createStore(server, region, new LoadCounter());
166      fail("Should fail since AssignProcedure is not supported");
167    } catch (HBaseIOException e) {
168      assertThat(e.getMessage(), startsWith("Unsupported"));
169    }
170  }
171}