diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index e4ac5b79bca7db..1b3fce75735865 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -127,6 +128,7 @@ #include "util/proto_util.h" #include "util/stopwatch.hpp" #include "util/string_util.h" +#include "util/thrift_rpc_helper.h" #include "util/thrift_util.h" #include "util/time.h" #include "util/uid_util.h" @@ -143,6 +145,37 @@ using namespace ErrorCode; const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3; +namespace { + +Status fetch_trusted_jdbc_table(ExecEnv* exec_env, int64_t catalog_id, TJdbcTable* jdbc_table) { + if (catalog_id <= 0) { + return Status::InvalidArgument("catalog id is not set for jdbc connection test"); + } + + TNetworkAddress master_addr = exec_env->cluster_info()->master_fe_addr; + if (master_addr.hostname.empty() || master_addr.port == 0) { + return Status::RpcError("master fe is not available for jdbc connection test"); + } + + TGetJdbcTestConnectionInfoRequest request; + request.__set_catalogId(catalog_id); + TGetJdbcTestConnectionInfoResult result; + RETURN_IF_ERROR(ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->getJdbcTestConnectionInfo(result, request); + })); + RETURN_IF_ERROR(Status::create(result.status)); + if (!result.__isset.jdbcTable) { + return Status::InternalError("frontend did not return jdbc table for catalog {}", + catalog_id); + } + *jdbc_table = result.jdbcTable; + return Status::OK(); +} + +} // namespace + DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_queue_size, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_queue_size, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_active_threads, MetricUnit::NOUNIT); @@ -977,26 +1010,21 @@ void PInternalService::test_jdbc_connection(google::protobuf::RpcController* con const PJdbcTestConnectionRequest* request, PJdbcTestConnectionResult* result, google::protobuf::Closure* done) { - bool ret = _heavy_work_pool.try_offer([request, result, done]() { + bool ret = _heavy_work_pool.try_offer([this, request, result, done]() { VLOG_RPC << "test jdbc connection"; brpc::ClosureGuard closure_guard(done); std::shared_ptr mem_tracker = MemTrackerLimiter::create_shared( MemTrackerLimiter::Type::OTHER, fmt::format("InternalService::test_jdbc_connection")); SCOPED_ATTACH_TASK(mem_tracker); - TTableDescriptor table_desc; + TJdbcTable jdbc_table; Status st = Status::OK(); - { - const uint8_t* buf = (const uint8_t*)request->jdbc_table().data(); - uint32_t len = request->jdbc_table().size(); - st = deserialize_thrift_msg(buf, &len, false, &table_desc); - if (!st.ok()) { - LOG(WARNING) << "test jdbc connection failed, errmsg=" << st; - st.to_protobuf(result->mutable_status()); - return; - } + st = fetch_trusted_jdbc_table(_exec_env, request->catalog_id(), &jdbc_table); + if (!st.ok()) { + LOG(WARNING) << "test jdbc connection failed, errmsg=" << st; + st.to_protobuf(result->mutable_status()); + return; } - TJdbcTable jdbc_table = (table_desc.jdbcTable); // Resolve driver URL to absolute file:// path std::string driver_url; @@ -1013,7 +1041,7 @@ void PInternalService::test_jdbc_connection(google::protobuf::RpcController* con params["jdbc_password"] = jdbc_table.jdbc_password; params["jdbc_driver_class"] = jdbc_table.jdbc_driver_class; params["jdbc_driver_url"] = driver_url; - params["query_sql"] = request->query_str(); + params["jdbc_driver_checksum"] = jdbc_table.jdbc_driver_checksum; params["catalog_id"] = std::to_string(jdbc_table.catalog_id); params["connection_pool_min_size"] = std::to_string(jdbc_table.connection_pool_min_size); params["connection_pool_max_size"] = std::to_string(jdbc_table.connection_pool_max_size); diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcConnectionTester.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcConnectionTester.java index e3a2cd3823a832..1440cdb4bdaf90 100644 --- a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcConnectionTester.java +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcConnectionTester.java @@ -33,6 +33,7 @@ import java.sql.SQLException; import java.util.Collections; import java.util.Map; +import java.util.Objects; /** * JdbcConnectionTester is a lightweight JNI-invocable class for testing JDBC connections. @@ -46,7 +47,6 @@ *

Parameters: *

    *
  • jdbc_url, jdbc_user, jdbc_password, jdbc_driver_class, jdbc_driver_url
  • - *
  • query_sql — the test query to run
  • *
  • catalog_id, connection_pool_min_size, connection_pool_max_size, etc.
  • *
  • clean_datasource — if "true", close the datasource pool on close()
  • *
@@ -59,7 +59,7 @@ public class JdbcConnectionTester extends JniScanner { private final String jdbcPassword; private final String jdbcDriverClass; private final String jdbcDriverUrl; - private final String querySql; + private final String jdbcDriverChecksum; private final long catalogId; private final int connectionPoolMinSize; private final int connectionPoolMaxSize; @@ -80,7 +80,7 @@ public JdbcConnectionTester(int batchSize, Map params) { this.jdbcPassword = params.getOrDefault("jdbc_password", ""); this.jdbcDriverClass = params.getOrDefault("jdbc_driver_class", ""); this.jdbcDriverUrl = params.getOrDefault("jdbc_driver_url", ""); - this.querySql = params.getOrDefault("query_sql", "SELECT 1"); + this.jdbcDriverChecksum = params.getOrDefault("jdbc_driver_checksum", ""); this.catalogId = Long.parseLong(params.getOrDefault("catalog_id", "0")); this.connectionPoolMinSize = Integer.parseInt( params.getOrDefault("connection_pool_min_size", "1")); @@ -112,6 +112,10 @@ public void open() throws IOException { ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader(); try { URL[] urls = {new URL(jdbcDriverUrl)}; + String actualChecksum = BaseJdbcExecutor.computeObjectChecksum(urls[0].toString(), null); + if (!jdbcDriverChecksum.equals(actualChecksum)) { + throw new IOException("Checksum mismatch for JDBC driver."); + } ClassLoader parent = getClass().getClassLoader(); this.classLoader = URLClassLoader.newInstance(urls, parent); Thread.currentThread().setContextClassLoader(classLoader); @@ -145,7 +149,9 @@ public void open() throws IOException { } conn = hikariDataSource.getConnection(); - stmt = conn.prepareStatement(querySql); + String validationQuery = Objects.requireNonNull( + hikariDataSource.getConnectionTestQuery(), "validation query"); + stmt = conn.prepareStatement(validationQuery); ResultSet rs = stmt.executeQuery(); if (!rs.next()) { throw new IOException( diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java index 6a7d58d2f3f71c..219f9b708058d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java @@ -48,18 +48,13 @@ import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TOdbcTableType; import org.apache.doris.thrift.TStatusCode; -import org.apache.doris.thrift.TTableDescriptor; -import org.apache.doris.thrift.TTableType; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; -import com.google.protobuf.ByteString; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.thrift.TException; -import org.apache.thrift.TSerializer; import java.io.IOException; import java.util.List; @@ -408,12 +403,8 @@ private void testBeToJdbcConnection(JdbcClient testClient) throws DdlException { } TNetworkAddress address = new TNetworkAddress(aliveBe.getHost(), aliveBe.getBrpcPort()); try { - TTableDescriptor testThrift = buildTestConnectionThrift(); TOdbcTableType tableType = JdbcExternalTable.parseJdbcType(testClient.getDbType()); - PJdbcTestConnectionRequest request = InternalService.PJdbcTestConnectionRequest.newBuilder() - .setJdbcTable(ByteString.copyFrom(new TSerializer().serialize(testThrift))) - .setJdbcTableType(tableType.getValue()) - .setQueryStr(testClient.getTestQuery()).build(); + PJdbcTestConnectionRequest request = buildTestConnectionRequest(tableType); InternalService.PJdbcTestConnectionResult result = null; Future future = BackendServiceProxy.getInstance() .testJdbcConnection(address, request); @@ -422,12 +413,19 @@ private void testBeToJdbcConnection(JdbcClient testClient) throws DdlException { if (code != TStatusCode.OK) { throw new DdlException("Test BE Connection to JDBC Failed: " + result.getStatus().getErrorMsgs(0)); } - } catch (TException | RpcException | ExecutionException | InterruptedException e) { + } catch (RpcException | ExecutionException | InterruptedException e) { throw new RuntimeException(e); } } - private TTableDescriptor buildTestConnectionThrift() throws DdlException { + public PJdbcTestConnectionRequest buildTestConnectionRequest(TOdbcTableType tableType) { + return InternalService.PJdbcTestConnectionRequest.newBuilder() + .setCatalogId(getId()) + .setJdbcTableType(tableType.getValue()) + .build(); + } + + public TJdbcTable buildTrustedJdbcTable() throws DdlException { TJdbcTable tJdbcTable = new TJdbcTable(); tJdbcTable.setCatalogId(this.getId()); tJdbcTable.setJdbcUrl(getJdbcUrl()); @@ -437,16 +435,16 @@ private TTableDescriptor buildTestConnectionThrift() throws DdlException { tJdbcTable.setJdbcDriverClass(getDriverClass()); tJdbcTable.setJdbcDriverUrl(getDriverUrl()); tJdbcTable.setJdbcResourceName(""); - tJdbcTable.setJdbcDriverChecksum(JdbcResource.computeObjectChecksum(getDriverUrl())); + String driverChecksum = Strings.isNullOrEmpty(getCheckSum()) + ? JdbcResource.computeObjectChecksum(getDriverUrl()) + : getCheckSum(); + tJdbcTable.setJdbcDriverChecksum(driverChecksum); tJdbcTable.setConnectionPoolMinSize(getConnectionPoolMinSize()); tJdbcTable.setConnectionPoolMaxSize(getConnectionPoolMaxSize()); tJdbcTable.setConnectionPoolMaxWaitTime(getConnectionPoolMaxWaitTime()); tJdbcTable.setConnectionPoolMaxLifeTime(getConnectionPoolMaxLifeTime()); tJdbcTable.setConnectionPoolKeepAlive(isConnectionPoolKeepAlive()); - TTableDescriptor tTableDescriptor = new TTableDescriptor(0, TTableType.JDBC_TABLE, 0, 0, - "test_jdbc_connection", ""); - tTableDescriptor.setJdbcTable(tJdbcTable); - return tTableDescriptor; + return tJdbcTable; } public ExternalFunctionRules getFunctionRules() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index b9d06cfcd4ef8e..c0b08b9a163f0d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -82,6 +82,7 @@ import org.apache.doris.datasource.ExternalDatabase; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.SplitSource; +import org.apache.doris.datasource.jdbc.JdbcExternalCatalog; import org.apache.doris.encryption.EncryptionKey; import org.apache.doris.info.TableNameInfo; import org.apache.doris.info.TableRefInfo; @@ -194,6 +195,8 @@ import org.apache.doris.thrift.TGetDbsResult; import org.apache.doris.thrift.TGetEncryptionKeysRequest; import org.apache.doris.thrift.TGetEncryptionKeysResult; +import org.apache.doris.thrift.TGetJdbcTestConnectionInfoRequest; +import org.apache.doris.thrift.TGetJdbcTestConnectionInfoResult; import org.apache.doris.thrift.TGetMasterTokenRequest; import org.apache.doris.thrift.TGetMasterTokenResult; import org.apache.doris.thrift.TGetMetaDB; @@ -221,6 +224,7 @@ import org.apache.doris.thrift.TInsertOverwriteTaskRequest; import org.apache.doris.thrift.TInsertOverwriteTaskResult; import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest; +import org.apache.doris.thrift.TJdbcTable; import org.apache.doris.thrift.TListPrivilegesResult; import org.apache.doris.thrift.TListTableMetadataNameIdsResult; import org.apache.doris.thrift.TListTableStatusResult; @@ -3198,6 +3202,36 @@ private TInitExternalCtlMetaResult initDb(long catalogId, long dbId) throws TExc return result; } + @Override + public TGetJdbcTestConnectionInfoResult getJdbcTestConnectionInfo(TGetJdbcTestConnectionInfoRequest request) + throws TException { + TGetJdbcTestConnectionInfoResult result = new TGetJdbcTestConnectionInfoResult(); + TStatus status = new TStatus(TStatusCode.OK); + result.setStatus(status); + + if (!request.isSetCatalogId()) { + status.setStatusCode(TStatusCode.ANALYSIS_ERROR); + status.addToErrorMsgs("catalog id is not set"); + return result; + } + + CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(request.getCatalogId()); + if (!(catalog instanceof JdbcExternalCatalog)) { + status.setStatusCode(TStatusCode.ANALYSIS_ERROR); + status.addToErrorMsgs("catalog is not a JDBC external catalog: " + request.getCatalogId()); + return result; + } + + try { + TJdbcTable jdbcTable = ((JdbcExternalCatalog) catalog).buildTrustedJdbcTable(); + result.setJdbcTable(jdbcTable); + } catch (DdlException e) { + status.setStatusCode(TStatusCode.INTERNAL_ERROR); + status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage())); + } + return result; + } + @Override public TMySqlLoadAcquireTokenResult acquireToken() throws TException { String clientAddr = getClientAddrAsString(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java index c63aebfcb35b0a..804e870c226de2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java @@ -22,6 +22,9 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.datasource.CatalogFactory; import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.proto.InternalService; +import org.apache.doris.thrift.TJdbcTable; +import org.apache.doris.thrift.TOdbcTableType; import com.google.common.collect.Maps; import org.junit.Assert; @@ -111,4 +114,26 @@ public void checkPropertiesTest() { exceptione3.getMessage()); } + + @Test + public void testBuildTestConnectionRequestUsesCatalogIdOnly() throws DdlException { + InternalService.PJdbcTestConnectionRequest request = jdbcExternalCatalog + .buildTestConnectionRequest(TOdbcTableType.ORACLE); + Assert.assertTrue(request.hasCatalogId()); + Assert.assertEquals(1L, request.getCatalogId()); + Assert.assertFalse(request.hasJdbcTable()); + Assert.assertFalse(request.hasQueryStr()); + Assert.assertEquals(TOdbcTableType.ORACLE.getValue(), request.getJdbcTableType()); + } + + @Test + public void testBuildTrustedJdbcTableUsesCatalogProperties() throws DdlException { + TJdbcTable jdbcTable = jdbcExternalCatalog.buildTrustedJdbcTable(); + Assert.assertEquals(1L, jdbcTable.getCatalogId()); + Assert.assertEquals("jdbc:oracle:thin:@127.0.0.1:1521:XE", jdbcTable.getJdbcUrl()); + Assert.assertEquals("oracle.jdbc.driver.OracleDriver", jdbcTable.getJdbcDriverClass()); + Assert.assertEquals("ojdbc8.jar", jdbcTable.getJdbcDriverUrl()); + Assert.assertEquals("", jdbcTable.getJdbcDriverChecksum()); + Assert.assertEquals("test_jdbc_connection", jdbcTable.getJdbcTableName()); + } } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 50274c176274fa..c81f5be4a119ce 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -779,6 +779,7 @@ message PJdbcTestConnectionRequest { optional bytes jdbc_table = 1; optional int32 jdbc_table_type = 2; optional string query_str = 3; + optional int64 catalog_id = 4; } message PJdbcTestConnectionResult { @@ -1239,4 +1240,3 @@ service PBackendService { rpc fetch_peer_data(PFetchPeerDataRequest) returns (PFetchPeerDataResponse); rpc request_cdc_client(PRequestCdcClientRequest) returns (PRequestCdcClientResult); }; - diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 1d8f6e6bbd9f3c..65ee18d79d85fe 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -855,6 +855,15 @@ struct TInitExternalCtlMetaResult { 2: optional string status; } +struct TGetJdbcTestConnectionInfoRequest { + 1: optional i64 catalogId +} + +struct TGetJdbcTestConnectionInfoResult { + 1: required Status.TStatus status + 2: optional Descriptors.TJdbcTable jdbcTable +} + enum TSchemaTableName { // BACKENDS = 0, METADATA_TABLE = 1, // tvf @@ -1934,6 +1943,8 @@ service FrontendService { TInitExternalCtlMetaResult initExternalCtlMeta(1: TInitExternalCtlMetaRequest request) + TGetJdbcTestConnectionInfoResult getJdbcTestConnectionInfo(1: TGetJdbcTestConnectionInfoRequest request) + TFetchSchemaTableDataResult fetchSchemaTableData(1: TFetchSchemaTableDataRequest request) TMySqlLoadAcquireTokenResult acquireToken()