diff --git a/docs/en/14-reference/01-components/01-taosd.md b/docs/en/14-reference/01-components/01-taosd.md index a3e4bc42ce36..dfdd99e0bba4 100644 --- a/docs/en/14-reference/01-components/01-taosd.md +++ b/docs/en/14-reference/01-components/01-taosd.md @@ -436,7 +436,7 @@ The effective value of charset is UTF-8. | mqRebalanceInterval | | Supported, effective immediately | Internal parameter, interval for consumer rebalancing | | uptimeInterval | | Supported, effective immediately | Internal parameter, for recording system uptime | | timeseriesThreshold | | Supported, effective immediately | Internal parameter, for usage statistics | -| udf | | Supported, effective after restart | Whether to start UDF service; 0: do not start, 1: start; default value 1(The default value on Windows is 0.) | +| udf | | Supported, effective after restart | Whether to start UDF service; 0: do not start, 1: start; default value 1 | | udfdResFuncs | | Supported, effective after restart | Internal parameter, for setting UDF result sets| | udfdLdLibPath | | Supported, effective after restart | Internal parameter, indicates the library path for loading UDF | | streamBatchRequestWaitMs | | Supported, effective after restart | Stream computing batch request wait time, range 0-1800000, in milliseconds, default value 5000 | diff --git a/docs/zh/14-reference/01-components/01-taosd.md b/docs/zh/14-reference/01-components/01-taosd.md index 3d2b1be562e0..be6d00009d34 100644 --- a/docs/zh/14-reference/01-components/01-taosd.md +++ b/docs/zh/14-reference/01-components/01-taosd.md @@ -1857,7 +1857,7 @@ charset 的有效值是 UTF-8。 - 说明:是否启动 UDF 服务 - 类型:整数;0:不启动,1:启动。 -- 默认值:1(windows 上默认值为 0) +- 默认值:1 - 最小值:0 - 最大值:1 - 参数类型:全局配置参数 diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 5c6c44a2a536..ebad0dd29d00 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -442,10 +442,10 @@ bool tsIfAdtFse = false; // ADT-FSE algorithom or origina char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR // udf -#if defined(WINDOWS) || !defined(USE_UDF) -bool tsStartUdfd = false; -#else +#ifdef USE_UDF bool tsStartUdfd = true; +#else +bool tsStartUdfd = false; #endif // wal diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index a7fe9e7d5647..7178130b50d9 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -427,10 +427,6 @@ static int32_t mndProcessCreateFuncReq(SRpcMsg *pReq) { TAOS_CHECK_GOTO(tDeserializeSCreateFuncReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER); -#ifdef WINDOWS - code = TSDB_CODE_MND_INVALID_PLATFORM; - goto _OVER; -#endif mInfo("func:%s, start to create, size:%d", createReq.name, createReq.codeLen); TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CREATE_FUNC), NULL, _OVER); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 50b101acfdf4..c334be561049 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -1359,7 +1359,7 @@ int32_t projectApplyFunctionsWithSelect(SExprInfo* pExpr, SSDataBlock* pResult, int32_t code = TSDB_CODE_SUCCESS; SExecTaskInfo* savedTaskInfo = gTaskScalarExtra.pTaskInfo; - __typeof__(gTaskScalarExtra.isTaskKilled) savedIsTaskKilled = gTaskScalarExtra.isTaskKilled; + sclIsTaskKilled savedIsTaskKilled = gTaskScalarExtra.isTaskKilled; if (pTaskInfo != NULL) { gTaskScalarExtra.pTaskInfo = pTaskInfo; diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 4943165cd5ba..ff5647d30167 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -54,7 +54,17 @@ SUdfdData udfdGlobal = {0}; int32_t udfStartUdfd(int32_t startDnodeId); void udfStopUdfd(); -extern char **environ; +/* Note: udfd spawning now uses libuv's portable ``uv_os_environ`` instead + * of the POSIX ``extern char **environ`` symbol so the same code works on + * Windows. */ + +#ifdef WINDOWS +#define UDF_LIB_PATH_ENV "PATH" +#define UDF_LIB_PATH_SEP ';' +#else +#define UDF_LIB_PATH_ENV "LD_LIBRARY_PATH" +#define UDF_LIB_PATH_SEP ':' +#endif static int32_t udfSpawnUdfd(SUdfdData *pData); void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal); @@ -87,6 +97,11 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) { int32_t err = 0; uv_process_options_t options = {0}; + char *pathTaosdLdLibHeap = NULL; + char *udfdPathLdLib = NULL; + char *ldLibPathEnvItem = NULL; + char *taosFqdnEnvItem = NULL; + char **envUdfdWithPEnv = NULL; char path[PATH_MAX] = {0}; if (tsProcPath == NULL) { @@ -121,10 +136,24 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) { options.exit_cb = udfUdfdExit; - TAOS_UV_LIB_ERROR_RET(uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1)); - + // Stdio container references &pData->ctrlPipe by pointer; the pointer is + // stable across this function regardless of when the handle is initialized. + // We defer uv_pipe_init until just before uv_spawn (below) so that none of + // the env-construction failure paths above can goto _OVER with an + // already-initialized loop handle that would later trip uv_loop_close. uv_stdio_container_t child_stdio[3]; + // UV_OVERLAPPED_PIPE is required on Windows: without it libuv uses + // CreatePipe() which yields a non-overlapped anonymous pipe, and the + // child's uv_pipe_open(fd 0) fails because libuv's IOCP machinery + // requires FILE_FLAG_OVERLAPPED. On POSIX the same flag is aliased to + // UV_NONBLOCK_PIPE in libuv >=1.49 -- adding it would put the child's + // stdin into non-blocking mode and risk data loss for an unaware child, + // so we only set it on Windows. +#ifdef WINDOWS + child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_OVERLAPPED_PIPE; +#else child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; +#endif child_stdio[0].data.stream = (uv_stream_t *)&pData->ctrlPipe; child_stdio[1].flags = UV_IGNORE; child_stdio[2].flags = UV_INHERIT_FD; @@ -146,28 +175,57 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) { numCpuCores = TMAX(numCpuCores, 2); snprintf(thrdPoolSizeEnvItem, 32, "%s=%d", "UV_THREADPOOL_SIZE", (int32_t)numCpuCores * 2); - char pathTaosdLdLib[512] = {0}; - size_t taosdLdLibPathLen = sizeof(pathTaosdLdLib); - int32_t ret = uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen); - if (ret != UV_ENOBUFS) { + char pathTaosdLdLibStack[512] = {0}; + char *pathTaosdLdLib = pathTaosdLdLibStack; + size_t taosdLdLibPathLen = sizeof(pathTaosdLdLibStack); + int32_t ret = uv_os_getenv(UDF_LIB_PATH_ENV, pathTaosdLdLib, &taosdLdLibPathLen); + if (ret == UV_ENOBUFS) { + // taosdLdLibPathLen now holds the required buffer size (incl. NUL). + pathTaosdLdLibHeap = (char *)taosMemoryCalloc(taosdLdLibPathLen, 1); + if (pathTaosdLdLibHeap == NULL) { + err = terrno; + goto _OVER; + } + pathTaosdLdLib = pathTaosdLdLibHeap; + ret = uv_os_getenv(UDF_LIB_PATH_ENV, pathTaosdLdLib, &taosdLdLibPathLen); + if (ret != 0) { + pathTaosdLdLib[0] = '\0'; + taosdLdLibPathLen = 0; + } else { + taosdLdLibPathLen = strlen(pathTaosdLdLib); + } + } else if (ret != 0) { + pathTaosdLdLib[0] = '\0'; + taosdLdLibPathLen = 0; + } else { taosdLdLibPathLen = strlen(pathTaosdLdLib); } - char udfdPathLdLib[1024] = {0}; size_t udfdLdLibPathLen = strlen(tsUdfdLdLibPath); - tstrncpy(udfdPathLdLib, tsUdfdLdLibPath, sizeof(udfdPathLdLib)); - - udfdPathLdLib[udfdLdLibPathLen] = ':'; - tstrncpy(udfdPathLdLib + udfdLdLibPathLen + 1, pathTaosdLdLib, sizeof(udfdPathLdLib) - udfdLdLibPathLen - 1); - if (udfdLdLibPathLen + taosdLdLibPathLen < 1024) { - fnInfo("udfd LD_LIBRARY_PATH: %s", udfdPathLdLib); + size_t joinedLen = udfdLdLibPathLen + taosdLdLibPathLen + 2; // sep + NUL + udfdPathLdLib = (char *)taosMemoryCalloc(joinedLen, 1); + if (udfdPathLdLib == NULL) { + err = terrno; + goto _OVER; + } + if (udfdLdLibPathLen > 0 && taosdLdLibPathLen > 0) { + snprintf(udfdPathLdLib, joinedLen, "%s%c%s", + tsUdfdLdLibPath, UDF_LIB_PATH_SEP, pathTaosdLdLib); + } else if (udfdLdLibPathLen > 0) { + tstrncpy(udfdPathLdLib, tsUdfdLdLibPath, joinedLen); } else { - fnError("can not set correct udfd LD_LIBRARY_PATH"); + tstrncpy(udfdPathLdLib, pathTaosdLdLib, joinedLen); + } + fnInfo("udfd %s: %s", UDF_LIB_PATH_ENV, udfdPathLdLib); + + size_t ldLibEnvLen = strlen(UDF_LIB_PATH_ENV) + 1 /* '=' */ + strlen(udfdPathLdLib) + 1; + ldLibPathEnvItem = (char *)taosMemoryCalloc(ldLibEnvLen, 1); + if (ldLibPathEnvItem == NULL) { + err = terrno; + goto _OVER; } - char ldLibPathEnvItem[1024 + 32] = {0}; - snprintf(ldLibPathEnvItem, 1024 + 32, "%s=%s", "LD_LIBRARY_PATH", udfdPathLdLib); + snprintf(ldLibPathEnvItem, ldLibEnvLen, "%s=%s", UDF_LIB_PATH_ENV, udfdPathLdLib); - char *taosFqdnEnvItem = NULL; char *taosFqdn = getenv("TAOS_FQDN"); if (taosFqdn != NULL) { int32_t subLen = strlen(taosFqdn); @@ -179,54 +237,107 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) { fnInfo("[UDFD]Succsess to set TAOS_FQDN:%s", taosFqdn); } else { fnError("[UDFD]Failed to allocate memory for TAOS_FQDN"); - return terrno; + err = terrno; + goto _OVER; } } char *envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, ldLibPathEnvItem, taosFqdnEnvItem, NULL}; - char **envUdfdWithPEnv = NULL; - if (environ != NULL) { + // Inherit the parent process's environment so spawned udfd sees the same + // PATH / locale / config-related variables. Using libuv's portable + // ``uv_os_environ`` instead of POSIX ``extern char **environ`` lets this + // path work on Windows as well as Linux/macOS. + uv_env_item_t *uvEnvItems = NULL; + int uvEnvItemCount = 0; + int32_t uvEnvErr = uv_os_environ(&uvEnvItems, &uvEnvItemCount); + if (uvEnvErr == 0 && uvEnvItems != NULL && uvEnvItemCount > 0) { int32_t lenEnvUdfd = ARRAY_SIZE(envUdfd); - int32_t numEnviron = 0; - while (environ[numEnviron] != NULL) { - numEnviron++; - } - - envUdfdWithPEnv = (char **)taosMemoryCalloc(numEnviron + lenEnvUdfd, sizeof(char *)); + envUdfdWithPEnv = (char **)taosMemoryCalloc(uvEnvItemCount + lenEnvUdfd, sizeof(char *)); if (envUdfdWithPEnv == NULL) { err = TSDB_CODE_OUT_OF_MEMORY; + uv_os_free_environ(uvEnvItems, uvEnvItemCount); goto _OVER; } - for (int32_t i = 0; i < numEnviron; i++) { - int32_t len = strlen(environ[i]) + 1; - envUdfdWithPEnv[i] = (char *)taosMemoryCalloc(len, 1); - if (envUdfdWithPEnv[i] == NULL) { + // Names we'll be appending below. Skip these when copying the inherited + // env so the spawned udfd sees our overrides instead of duplicate keys. + // On Windows env-var names are case-insensitive (notably PATH); POSIX is + // case-sensitive. osDef.h maps strncasecmp -> _strnicmp on Windows. +#ifdef WINDOWS + int (*envNameCmp)(const char *, const char *, size_t) = strncasecmp; +#else + int (*envNameCmp)(const char *, const char *, size_t) = strncmp; +#endif + const char *overrideNames[] = {"DNODE_ID", "UV_THREADPOOL_SIZE", UDF_LIB_PATH_ENV, "TAOS_FQDN"}; + size_t overrideNameLens[ARRAY_SIZE(overrideNames)]; + for (size_t i = 0; i < ARRAY_SIZE(overrideNames); i++) { + overrideNameLens[i] = strlen(overrideNames[i]); + } + + int32_t outIdx = 0; + for (int i = 0; i < uvEnvItemCount; i++) { + const char *name = uvEnvItems[i].name; + size_t nameLen = strlen(name); + bool skip = false; + for (size_t k = 0; k < ARRAY_SIZE(overrideNames); k++) { + if (nameLen == overrideNameLens[k] && envNameCmp(name, overrideNames[k], nameLen) == 0) { + skip = true; + break; + } + } + if (skip) continue; + size_t valueLen = strlen(uvEnvItems[i].value); + size_t len = nameLen + 1 /* '=' */ + valueLen + 1 /* '\0' */; + envUdfdWithPEnv[outIdx] = (char *)taosMemoryCalloc(len, 1); + if (envUdfdWithPEnv[outIdx] == NULL) { err = TSDB_CODE_OUT_OF_MEMORY; + uv_os_free_environ(uvEnvItems, uvEnvItemCount); goto _OVER; } - - tstrncpy(envUdfdWithPEnv[i], environ[i], len); + snprintf(envUdfdWithPEnv[outIdx], len, "%s=%s", name, uvEnvItems[i].value); + outIdx++; } + uv_os_free_environ(uvEnvItems, uvEnvItemCount); + uvEnvItems = NULL; for (int32_t i = 0; i < lenEnvUdfd; i++) { if (envUdfd[i] != NULL) { - int32_t len = strlen(envUdfd[i]) + 1; - envUdfdWithPEnv[numEnviron + i] = (char *)taosMemoryCalloc(len, 1); - if (envUdfdWithPEnv[numEnviron + i] == NULL) { + size_t len = strlen(envUdfd[i]) + 1; + envUdfdWithPEnv[outIdx] = (char *)taosMemoryCalloc(len, 1); + if (envUdfdWithPEnv[outIdx] == NULL) { err = TSDB_CODE_OUT_OF_MEMORY; goto _OVER; } - - tstrncpy(envUdfdWithPEnv[numEnviron + i], envUdfd[i], len); + tstrncpy(envUdfdWithPEnv[outIdx], envUdfd[i], len); + outIdx++; } } - envUdfdWithPEnv[numEnviron + lenEnvUdfd - 1] = NULL; + envUdfdWithPEnv[outIdx] = NULL; options.env = envUdfdWithPEnv; } else { - options.env = envUdfd; + if (uvEnvItems != NULL) { + uv_os_free_environ(uvEnvItems, uvEnvItemCount); + } + // uv_os_environ failure is extremely rare; if it happens, fail fast rather + // than spawn udfd with a minimal env that drops PATH/locale and could lead + // to silently wrong behavior (e.g. loading the wrong .so). + fnError("failed to read process environment for udfd: %s", + uvEnvErr != 0 ? uv_strerror(uvEnvErr) : "empty environment"); + err = uvEnvErr != 0 ? uvEnvErr : TSDB_CODE_FAILED; + goto _OVER; + } + + // ipc=0: this is a one-way control pipe used only to detect parent death + // via read EOF. ipc=1 forces a libuv pid-handshake on Windows that fails + // for child-stdio pipes. Initialized here, after env construction, so any + // failure path above can goto _OVER without a stranded loop handle. + int32_t pipeInitErr = uv_pipe_init(&pData->loop, &pData->ctrlPipe, 0); + if (pipeInitErr != 0) { + fnError("uv_pipe_init failed: %s", uv_strerror(pipeInitErr)); + err = TSDB_CODE_UDF_UV_EXEC_FAILURE; + goto _OVER; } err = uv_spawn(&pData->loop, &pData->process, &options); @@ -258,9 +369,18 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) { } _OVER: + if (pathTaosdLdLibHeap) { + taosMemoryFree(pathTaosdLdLibHeap); + } + if (udfdPathLdLib) { + taosMemoryFree(udfdPathLdLib); + } if (taosFqdnEnvItem) { taosMemoryFree(taosFqdnEnvItem); } + if (ldLibPathEnvItem) { + taosMemoryFree(ldLibPathEnvItem); + } if (envUdfdWithPEnv != NULL) { int32_t i = 0; @@ -311,6 +431,12 @@ static void udfWatchUdfd(void *args) { if (terrno != 0) { (void)uv_barrier_wait(&pData->barrier); atomic_store_32(&pData->spawnErr, terrno); + // Any handle initialized before failure (stopAsync, ctrlPipe from a + // partial udfSpawnUdfd) must be closed and drained, otherwise + // uv_loop_close fails and leaks the loop's internal state. + uv_walk(&pData->loop, udfUdfdCloseWalkCb, NULL); + int32_t num = uv_run(&pData->loop, UV_RUN_DEFAULT); + fnInfo("udfd loop exit with %d active handles, line:%d", num, __LINE__); if (uv_loop_close(&pData->loop) != 0) { fnError("udfd loop close failed, lino:%d", __LINE__); } diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index bc6258a9f29d..cb4d528db7e1 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -401,8 +401,13 @@ int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[], int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { TAOS_UDF_CHECK_PTR_RCODE(plugin); plugin->scriptType = TSDB_FUNC_SCRIPT_PYTHON; - // todo: windows support +#ifdef WINDOWS + snprintf(plugin->libPath, PATH_MAX, "%s", "taospyudf.dll"); +#elif defined(_TD_DARWIN_64) + snprintf(plugin->libPath, PATH_MAX, "%s", "libtaospyudf.dylib"); +#else snprintf(plugin->libPath, PATH_MAX, "%s", "libtaospyudf.so"); +#endif plugin->libLoaded = false; const char *funcName[UDFD_MAX_PLUGIN_FUNCS] = {"pyOpen", "pyClose", "pyUdfInit", "pyUdfDestroy", "pyUdfScalarProc", "pyUdfAggStart", @@ -1058,22 +1063,22 @@ void udfdGetFuncBodyPath(const SUdf *udf, char *path) { TAOS_UDF_CHECK_PTR_RVOID(udf, path); if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) { #ifdef WINDOWS - snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".dll", global.udfDataDir, udf->name, udf->version, udf->createdTime); + snprintf(path, PATH_MAX, "%s" TD_DIRSEP "%s_%d_%" PRIx64 ".dll", global.udfDataDir, udf->name, udf->version, + udf->createdTime); #else - snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", global.udfDataDir, udf->name, udf->version, + snprintf(path, PATH_MAX, "%s" TD_DIRSEP "lib%s_%d_%" PRIx64 ".so", global.udfDataDir, udf->name, udf->version, udf->createdTime); #endif } else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) { -#ifdef WINDOWS - snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime); -#else - snprintf(path, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime); -#endif + snprintf(path, PATH_MAX, "%s" TD_DIRSEP "%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, + udf->createdTime); } else { #ifdef WINDOWS - snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime); + snprintf(path, PATH_MAX, "%s" TD_DIRSEP "%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, + udf->createdTime); #else - snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime); + snprintf(path, PATH_MAX, "%s" TD_DIRSEP "lib%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, + udf->createdTime); #endif } } @@ -1633,7 +1638,9 @@ static int32_t udfdUvInit() { TAOS_CHECK_RETURN(uv_loop_init(global.loop)); if (tsStartUdfd) { // udfd is started by taosd, which shall exit when taosd exit - TAOS_CHECK_RETURN(uv_pipe_init(global.loop, &global.ctrlPipe, 1)); + // ipc=0 — see tudf.c counterpart. Control pipe is used only for + // parent-death EOF; ipc=1 fails uv_pipe_open on Windows. + TAOS_CHECK_RETURN(uv_pipe_init(global.loop, &global.ctrlPipe, 0)); TAOS_CHECK_RETURN(uv_pipe_open(&global.ctrlPipe, 0)); TAOS_CHECK_RETURN(uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb)); }