🌐 AI搜索 & 代理 主页
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.leader.HouseKeepingTasks;
import org.apache.hadoop.hive.metastore.leader.LeaderElection;
import org.apache.hadoop.hive.metastore.leader.LeaderElectionContext;
import org.apache.hadoop.hive.metastore.leader.LeaderElectionFactory;
Expand All @@ -42,6 +43,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -290,9 +292,10 @@ public void setName(String name) {
protected void notifyListener() {
ScheduledExecutorService service = null;
if (!isLeader) {
try {
service = ThreadPool.getPool();
} catch (Exception ignored) {
Optional<LeadershipStateListener> houseKeepingTasks =
listeners.stream().filter(s -> s instanceof HouseKeepingTasks).findFirst();
if (houseKeepingTasks.isPresent()) {
service = ((HouseKeepingTasks) houseKeepingTasks.get()).getExecutorService();
}
}
super.notifyListener();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConfForTest;
import org.apache.hadoop.hive.metastore.ThreadPool;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
Expand Down Expand Up @@ -549,7 +548,6 @@ public void setUp() throws Exception {
public void tearDown() throws Exception {
if (txnMgr != null) txnMgr.closeTxnManager();
TestTxnDbUtil.cleanDb(conf);
ThreadPool.shutdown();
}

private static class MockQueryPlan extends QueryPlan {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ public static void main(String[] args) throws Throwable {
} catch (Exception e) {
LOG.error("Error removing znode for this metastore instance from ZooKeeper.", e);
}
ThreadPool.shutdown();
}, 10);

//Start Metrics for Standalone (Remote) Mode
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,22 @@

package org.apache.hadoop.hive.metastore.leader;

import com.cronutils.utils.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStore;
import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
import org.apache.hadoop.hive.metastore.ThreadPool;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.service.CompactionHouseKeeperService;
import org.apache.hadoop.hive.metastore.utils.JavaUtils;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import static java.util.Objects.requireNonNull;
Expand All @@ -38,7 +43,7 @@ public class HouseKeepingTasks implements LeaderElection.LeadershipStateListener
private final Configuration configuration;

// shut down pool when new leader is selected
private ThreadPool metastoreTaskThreadPool;
private ScheduledExecutorService metastoreTaskThreadPool;

private boolean runOnlyRemoteTasks;

Expand Down Expand Up @@ -94,30 +99,24 @@ public void takeLeadership(LeaderElection election) throws Exception {
throw new IllegalStateException("There should be no running tasks before taking the leadership!");
}
runningTasks = new ArrayList<>();
metastoreTaskThreadPool = ThreadPool.initialize(configuration);
ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Metastore Scheduled Worker(" + election.getName() + ") %d").build();
final List<MetastoreTaskThread> tasks;
if (!runOnlyRemoteTasks) {
List<MetastoreTaskThread> alwaysTasks = new ArrayList<>(getAlwaysTasks());
for (MetastoreTaskThread task : alwaysTasks) {
task.setConf(configuration);
task.enforceMutex(election.enforceMutex());
long freq = task.runFrequency(TimeUnit.MILLISECONDS);
// For backwards compatibility, since some threads used to be hard coded but only run if
// frequency was > 0
if (freq > 0) {
runningTasks.add(task);
metastoreTaskThreadPool.getPool().scheduleAtFixedRate(task, freq, freq, TimeUnit.MILLISECONDS);
}
}
tasks = new ArrayList<>(getAlwaysTasks());
} else {
List<MetastoreTaskThread> remoteOnlyTasks = new ArrayList<>(getRemoteOnlyTasks());
for (MetastoreTaskThread task : remoteOnlyTasks) {
task.setConf(configuration);
task.enforceMutex(election.enforceMutex());
long freq = task.runFrequency(TimeUnit.MILLISECONDS);
if (freq > 0) {
runningTasks.add(task);
metastoreTaskThreadPool.getPool().scheduleAtFixedRate(task, freq, freq, TimeUnit.MILLISECONDS);
}
tasks = new ArrayList<>(getRemoteOnlyTasks());
}
int poolSize = Math.min(MetastoreConf.getIntVar(configuration,
MetastoreConf.ConfVars.THREAD_POOL_SIZE), tasks.size());
metastoreTaskThreadPool = Executors.newScheduledThreadPool(poolSize, threadFactory);
for (MetastoreTaskThread task : tasks) {
task.setConf(configuration);
task.enforceMutex(election.enforceMutex());
long freq = task.runFrequency(TimeUnit.MILLISECONDS);
if (freq > 0) {
runningTasks.add(task);
metastoreTaskThreadPool.scheduleAtFixedRate(task, freq, freq, TimeUnit.MILLISECONDS);
}
}

Expand All @@ -141,4 +140,8 @@ public void lossLeadership(LeaderElection election) throws Exception {
}
}

@VisibleForTesting
public ScheduledExecutorService getExecutorService() {
return metastoreTaskThreadPool;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public class LeaseLeaderElection implements LeaderElection<TableName> {
private volatile boolean stopped = false;

// Leadership change listeners
private final List<LeadershipStateListener> listeners = new ArrayList<>();
protected final List<LeadershipStateListener> listeners = new ArrayList<>();

protected String name;
private final String userName;
Expand Down