Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 41 additions & 13 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <fcntl.h>
#include <fmt/core.h>
#include <gen_cpp/DataSinks_types.h>
#include <gen_cpp/FrontendService.h>
#include <gen_cpp/MasterService_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/PlanNodes_types.h>
Expand Down Expand Up @@ -127,6 +128,7 @@
#include "util/proto_util.h"
#include "util/stopwatch.hpp"
#include "util/string_util.h"
#include "util/thrift_rpc_helper.h"
#include "util/thrift_util.h"
#include "util/time.h"
#include "util/uid_util.h"
Expand All @@ -143,6 +145,37 @@ using namespace ErrorCode;

const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3;

namespace {

Status fetch_trusted_jdbc_table(ExecEnv* exec_env, int64_t catalog_id, TJdbcTable* jdbc_table) {
if (catalog_id <= 0) {
return Status::InvalidArgument("catalog id is not set for jdbc connection test");
}

TNetworkAddress master_addr = exec_env->cluster_info()->master_fe_addr;
if (master_addr.hostname.empty() || master_addr.port == 0) {
return Status::RpcError("master fe is not available for jdbc connection test");
}

TGetJdbcTestConnectionInfoRequest request;
request.__set_catalogId(catalog_id);
TGetJdbcTestConnectionInfoResult result;
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &result](FrontendServiceConnection& client) {
client->getJdbcTestConnectionInfo(result, request);
}));
RETURN_IF_ERROR(Status::create(result.status));
if (!result.__isset.jdbcTable) {
return Status::InternalError("frontend did not return jdbc table for catalog {}",
catalog_id);
}
*jdbc_table = result.jdbcTable;
return Status::OK();
}

} // namespace

DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_active_threads, MetricUnit::NOUNIT);
Expand Down Expand Up @@ -977,26 +1010,21 @@ void PInternalService::test_jdbc_connection(google::protobuf::RpcController* con
const PJdbcTestConnectionRequest* request,
PJdbcTestConnectionResult* result,
google::protobuf::Closure* done) {
bool ret = _heavy_work_pool.try_offer([request, result, done]() {
bool ret = _heavy_work_pool.try_offer([this, request, result, done]() {
VLOG_RPC << "test jdbc connection";
brpc::ClosureGuard closure_guard(done);
std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::OTHER,
fmt::format("InternalService::test_jdbc_connection"));
SCOPED_ATTACH_TASK(mem_tracker);
TTableDescriptor table_desc;
TJdbcTable jdbc_table;
Status st = Status::OK();
{
const uint8_t* buf = (const uint8_t*)request->jdbc_table().data();
uint32_t len = request->jdbc_table().size();
st = deserialize_thrift_msg(buf, &len, false, &table_desc);
if (!st.ok()) {
LOG(WARNING) << "test jdbc connection failed, errmsg=" << st;
st.to_protobuf(result->mutable_status());
return;
}
st = fetch_trusted_jdbc_table(_exec_env, request->catalog_id(), &jdbc_table);
if (!st.ok()) {
LOG(WARNING) << "test jdbc connection failed, errmsg=" << st;
st.to_protobuf(result->mutable_status());
return;
}
TJdbcTable jdbc_table = (table_desc.jdbcTable);

// Resolve driver URL to absolute file:// path
std::string driver_url;
Expand All @@ -1013,7 +1041,7 @@ void PInternalService::test_jdbc_connection(google::protobuf::RpcController* con
params["jdbc_password"] = jdbc_table.jdbc_password;
params["jdbc_driver_class"] = jdbc_table.jdbc_driver_class;
params["jdbc_driver_url"] = driver_url;
params["query_sql"] = request->query_str();
params["jdbc_driver_checksum"] = jdbc_table.jdbc_driver_checksum;
params["catalog_id"] = std::to_string(jdbc_table.catalog_id);
params["connection_pool_min_size"] = std::to_string(jdbc_table.connection_pool_min_size);
params["connection_pool_max_size"] = std::to_string(jdbc_table.connection_pool_max_size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.sql.SQLException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;

/**
* JdbcConnectionTester is a lightweight JNI-invocable class for testing JDBC connections.
Expand All @@ -46,7 +47,6 @@
* <p>Parameters:
* <ul>
* <li>jdbc_url, jdbc_user, jdbc_password, jdbc_driver_class, jdbc_driver_url</li>
* <li>query_sql — the test query to run</li>
* <li>catalog_id, connection_pool_min_size, connection_pool_max_size, etc.</li>
* <li>clean_datasource — if "true", close the datasource pool on close()</li>
* </ul>
Expand All @@ -59,7 +59,7 @@ public class JdbcConnectionTester extends JniScanner {
private final String jdbcPassword;
private final String jdbcDriverClass;
private final String jdbcDriverUrl;
private final String querySql;
private final String jdbcDriverChecksum;
private final long catalogId;
private final int connectionPoolMinSize;
private final int connectionPoolMaxSize;
Expand All @@ -80,7 +80,7 @@ public JdbcConnectionTester(int batchSize, Map<String, String> params) {
this.jdbcPassword = params.getOrDefault("jdbc_password", "");
this.jdbcDriverClass = params.getOrDefault("jdbc_driver_class", "");
this.jdbcDriverUrl = params.getOrDefault("jdbc_driver_url", "");
this.querySql = params.getOrDefault("query_sql", "SELECT 1");
this.jdbcDriverChecksum = params.getOrDefault("jdbc_driver_checksum", "");
this.catalogId = Long.parseLong(params.getOrDefault("catalog_id", "0"));
this.connectionPoolMinSize = Integer.parseInt(
params.getOrDefault("connection_pool_min_size", "1"));
Expand Down Expand Up @@ -112,6 +112,10 @@ public void open() throws IOException {
ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
try {
URL[] urls = {new URL(jdbcDriverUrl)};
String actualChecksum = BaseJdbcExecutor.computeObjectChecksum(urls[0].toString(), null);
if (!jdbcDriverChecksum.equals(actualChecksum)) {
throw new IOException("Checksum mismatch for JDBC driver.");
}
ClassLoader parent = getClass().getClassLoader();
this.classLoader = URLClassLoader.newInstance(urls, parent);
Thread.currentThread().setContextClassLoader(classLoader);
Expand Down Expand Up @@ -145,7 +149,9 @@ public void open() throws IOException {
}

conn = hikariDataSource.getConnection();
stmt = conn.prepareStatement(querySql);
String validationQuery = Objects.requireNonNull(
hikariDataSource.getConnectionTestQuery(), "validation query");
stmt = conn.prepareStatement(validationQuery);
ResultSet rs = stmt.executeQuery();
if (!rs.next()) {
throw new IOException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,13 @@
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TOdbcTableType;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -408,12 +403,8 @@ private void testBeToJdbcConnection(JdbcClient testClient) throws DdlException {
}
TNetworkAddress address = new TNetworkAddress(aliveBe.getHost(), aliveBe.getBrpcPort());
try {
TTableDescriptor testThrift = buildTestConnectionThrift();
TOdbcTableType tableType = JdbcExternalTable.parseJdbcType(testClient.getDbType());
PJdbcTestConnectionRequest request = InternalService.PJdbcTestConnectionRequest.newBuilder()
.setJdbcTable(ByteString.copyFrom(new TSerializer().serialize(testThrift)))
.setJdbcTableType(tableType.getValue())
.setQueryStr(testClient.getTestQuery()).build();
PJdbcTestConnectionRequest request = buildTestConnectionRequest(tableType);
InternalService.PJdbcTestConnectionResult result = null;
Future<PJdbcTestConnectionResult> future = BackendServiceProxy.getInstance()
.testJdbcConnection(address, request);
Expand All @@ -422,12 +413,19 @@ private void testBeToJdbcConnection(JdbcClient testClient) throws DdlException {
if (code != TStatusCode.OK) {
throw new DdlException("Test BE Connection to JDBC Failed: " + result.getStatus().getErrorMsgs(0));
}
} catch (TException | RpcException | ExecutionException | InterruptedException e) {
} catch (RpcException | ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}

private TTableDescriptor buildTestConnectionThrift() throws DdlException {
public PJdbcTestConnectionRequest buildTestConnectionRequest(TOdbcTableType tableType) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking: this switches the BE connectivity test to resolve the catalog back out of CatalogMgr, but checkWhenCreating() runs before the new catalog is registered there. The create flow is CatalogFactory.createCatalog(...)->catalog.checkWhenCreating() first, and only later CatalogMgr.createCatalogImpl(...)->createCatalogInternal(...) adds the catalog. In that window FrontendServiceImpl.getJdbcTestConnectionInfo() will see getCatalog(request.getCatalogId()) == null, so CREATE CATALOG ... PROPERTIES('type'='jdbc', ...) now fails whenever test_connection=true and a BE is available. Please keep using the already-validated in-memory catalog properties during creation, or add a separate trusted path that does not depend on the catalog already existing in CatalogMgr.

return InternalService.PJdbcTestConnectionRequest.newBuilder()
.setCatalogId(getId())
.setJdbcTableType(tableType.getValue())
.build();
}

public TJdbcTable buildTrustedJdbcTable() throws DdlException {
TJdbcTable tJdbcTable = new TJdbcTable();
tJdbcTable.setCatalogId(this.getId());
tJdbcTable.setJdbcUrl(getJdbcUrl());
Expand All @@ -437,16 +435,16 @@ private TTableDescriptor buildTestConnectionThrift() throws DdlException {
tJdbcTable.setJdbcDriverClass(getDriverClass());
tJdbcTable.setJdbcDriverUrl(getDriverUrl());
tJdbcTable.setJdbcResourceName("");
tJdbcTable.setJdbcDriverChecksum(JdbcResource.computeObjectChecksum(getDriverUrl()));
String driverChecksum = Strings.isNullOrEmpty(getCheckSum())
? JdbcResource.computeObjectChecksum(getDriverUrl())
: getCheckSum();
tJdbcTable.setJdbcDriverChecksum(driverChecksum);
tJdbcTable.setConnectionPoolMinSize(getConnectionPoolMinSize());
tJdbcTable.setConnectionPoolMaxSize(getConnectionPoolMaxSize());
tJdbcTable.setConnectionPoolMaxWaitTime(getConnectionPoolMaxWaitTime());
tJdbcTable.setConnectionPoolMaxLifeTime(getConnectionPoolMaxLifeTime());
tJdbcTable.setConnectionPoolKeepAlive(isConnectionPoolKeepAlive());
TTableDescriptor tTableDescriptor = new TTableDescriptor(0, TTableType.JDBC_TABLE, 0, 0,
"test_jdbc_connection", "");
tTableDescriptor.setJdbcTable(tJdbcTable);
return tTableDescriptor;
return tJdbcTable;
}

public ExternalFunctionRules getFunctionRules() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.SplitSource;
import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
import org.apache.doris.encryption.EncryptionKey;
import org.apache.doris.info.TableNameInfo;
import org.apache.doris.info.TableRefInfo;
Expand Down Expand Up @@ -194,6 +195,8 @@
import org.apache.doris.thrift.TGetDbsResult;
import org.apache.doris.thrift.TGetEncryptionKeysRequest;
import org.apache.doris.thrift.TGetEncryptionKeysResult;
import org.apache.doris.thrift.TGetJdbcTestConnectionInfoRequest;
import org.apache.doris.thrift.TGetJdbcTestConnectionInfoResult;
import org.apache.doris.thrift.TGetMasterTokenRequest;
import org.apache.doris.thrift.TGetMasterTokenResult;
import org.apache.doris.thrift.TGetMetaDB;
Expand Down Expand Up @@ -221,6 +224,7 @@
import org.apache.doris.thrift.TInsertOverwriteTaskRequest;
import org.apache.doris.thrift.TInsertOverwriteTaskResult;
import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest;
import org.apache.doris.thrift.TJdbcTable;
import org.apache.doris.thrift.TListPrivilegesResult;
import org.apache.doris.thrift.TListTableMetadataNameIdsResult;
import org.apache.doris.thrift.TListTableStatusResult;
Expand Down Expand Up @@ -3198,6 +3202,36 @@ private TInitExternalCtlMetaResult initDb(long catalogId, long dbId) throws TExc
return result;
}

@Override
public TGetJdbcTestConnectionInfoResult getJdbcTestConnectionInfo(TGetJdbcTestConnectionInfoRequest request)
throws TException {
TGetJdbcTestConnectionInfoResult result = new TGetJdbcTestConnectionInfoResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);

if (!request.isSetCatalogId()) {
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs("catalog id is not set");
return result;
}

CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(request.getCatalogId());
if (!(catalog instanceof JdbcExternalCatalog)) {
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs("catalog is not a JDBC external catalog: " + request.getCatalogId());
return result;
}

try {
TJdbcTable jdbcTable = ((JdbcExternalCatalog) catalog).buildTrustedJdbcTable();
result.setJdbcTable(jdbcTable);
} catch (DdlException e) {
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
}
return result;
}

@Override
public TMySqlLoadAcquireTokenResult acquireToken() throws TException {
String clientAddr = getClientAddrAsString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.apache.doris.common.FeConstants;
import org.apache.doris.datasource.CatalogFactory;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.proto.InternalService;
import org.apache.doris.thrift.TJdbcTable;
import org.apache.doris.thrift.TOdbcTableType;

import com.google.common.collect.Maps;
import org.junit.Assert;
Expand Down Expand Up @@ -111,4 +114,26 @@ public void checkPropertiesTest() {
exceptione3.getMessage());

}

@Test
public void testBuildTestConnectionRequestUsesCatalogIdOnly() throws DdlException {
InternalService.PJdbcTestConnectionRequest request = jdbcExternalCatalog
.buildTestConnectionRequest(TOdbcTableType.ORACLE);
Assert.assertTrue(request.hasCatalogId());
Assert.assertEquals(1L, request.getCatalogId());
Assert.assertFalse(request.hasJdbcTable());
Assert.assertFalse(request.hasQueryStr());
Assert.assertEquals(TOdbcTableType.ORACLE.getValue(), request.getJdbcTableType());
}

@Test
public void testBuildTrustedJdbcTableUsesCatalogProperties() throws DdlException {
TJdbcTable jdbcTable = jdbcExternalCatalog.buildTrustedJdbcTable();
Assert.assertEquals(1L, jdbcTable.getCatalogId());
Assert.assertEquals("jdbc:oracle:thin:@127.0.0.1:1521:XE", jdbcTable.getJdbcUrl());
Assert.assertEquals("oracle.jdbc.driver.OracleDriver", jdbcTable.getJdbcDriverClass());
Assert.assertEquals("ojdbc8.jar", jdbcTable.getJdbcDriverUrl());
Assert.assertEquals("", jdbcTable.getJdbcDriverChecksum());
Assert.assertEquals("test_jdbc_connection", jdbcTable.getJdbcTableName());
}
}
2 changes: 1 addition & 1 deletion gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ message PJdbcTestConnectionRequest {
optional bytes jdbc_table = 1;
optional int32 jdbc_table_type = 2;
optional string query_str = 3;
optional int64 catalog_id = 4;
}

message PJdbcTestConnectionResult {
Expand Down Expand Up @@ -1239,4 +1240,3 @@ service PBackendService {
rpc fetch_peer_data(PFetchPeerDataRequest) returns (PFetchPeerDataResponse);
rpc request_cdc_client(PRequestCdcClientRequest) returns (PRequestCdcClientResult);
};

11 changes: 11 additions & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,15 @@ struct TInitExternalCtlMetaResult {
2: optional string status;
}

struct TGetJdbcTestConnectionInfoRequest {
1: optional i64 catalogId
}

struct TGetJdbcTestConnectionInfoResult {
1: required Status.TStatus status
2: optional Descriptors.TJdbcTable jdbcTable
}

enum TSchemaTableName {
// BACKENDS = 0,
METADATA_TABLE = 1, // tvf
Expand Down Expand Up @@ -1934,6 +1943,8 @@ service FrontendService {

TInitExternalCtlMetaResult initExternalCtlMeta(1: TInitExternalCtlMetaRequest request)

TGetJdbcTestConnectionInfoResult getJdbcTestConnectionInfo(1: TGetJdbcTestConnectionInfoRequest request)

TFetchSchemaTableDataResult fetchSchemaTableData(1: TFetchSchemaTableDataRequest request)

TMySqlLoadAcquireTokenResult acquireToken()
Expand Down
Loading