Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/en/14-reference/01-components/01-taosd.md
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ The effective value of charset is UTF-8.
| mqRebalanceInterval | | Supported, effective immediately | Internal parameter, interval for consumer rebalancing |
| uptimeInterval | | Supported, effective immediately | Internal parameter, for recording system uptime |
| timeseriesThreshold | | Supported, effective immediately | Internal parameter, for usage statistics |
| udf | | Supported, effective after restart | Whether to start UDF service; 0: do not start, 1: start; default value 1(The default value on Windows is 0.) |
| udf | | Supported, effective after restart | Whether to start UDF service; 0: do not start, 1: start; default value 1 |
| udfdResFuncs | | Supported, effective after restart | Internal parameter, for setting UDF result sets|
| udfdLdLibPath | | Supported, effective after restart | Internal parameter, indicates the library path for loading UDF |
| streamBatchRequestWaitMs | | Supported, effective after restart | Stream computing batch request wait time, range 0-1800000, in milliseconds, default value 5000 |
Expand Down
2 changes: 1 addition & 1 deletion docs/zh/14-reference/01-components/01-taosd.md
Original file line number Diff line number Diff line change
Expand Up @@ -1857,7 +1857,7 @@ charset 的有效值是 UTF-8。

- 说明:是否启动 UDF 服务
- 类型:整数;0:不启动,1:启动。
- 默认值:1(windows 上默认值为 0)
- 默认值:1
- 最小值:0
- 最大值:1
- 参数类型:全局配置参数
Expand Down
6 changes: 3 additions & 3 deletions source/common/src/tglobal.c
Original file line number Diff line number Diff line change
Expand Up @@ -442,10 +442,10 @@ bool tsIfAdtFse = false; // ADT-FSE algorithom or origina
char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR

// udf
#if defined(WINDOWS) || !defined(USE_UDF)
bool tsStartUdfd = false;
#else
#ifdef USE_UDF
bool tsStartUdfd = true;
#else
bool tsStartUdfd = false;
#endif
Comment thread
stephenkgu marked this conversation as resolved.

Comment thread
stephenkgu marked this conversation as resolved.
// wal
Expand Down
4 changes: 0 additions & 4 deletions source/dnode/mnode/impl/src/mndFunc.c
Original file line number Diff line number Diff line change
Expand Up @@ -427,10 +427,6 @@ static int32_t mndProcessCreateFuncReq(SRpcMsg *pReq) {

TAOS_CHECK_GOTO(tDeserializeSCreateFuncReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);

#ifdef WINDOWS
code = TSDB_CODE_MND_INVALID_PLATFORM;
goto _OVER;
#endif
mInfo("func:%s, start to create, size:%d", createReq.name, createReq.codeLen);
TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CREATE_FUNC), NULL, _OVER);

Expand Down
2 changes: 1 addition & 1 deletion source/libs/executor/src/projectoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -1359,7 +1359,7 @@ int32_t projectApplyFunctionsWithSelect(SExprInfo* pExpr, SSDataBlock* pResult,
int32_t code = TSDB_CODE_SUCCESS;

SExecTaskInfo* savedTaskInfo = gTaskScalarExtra.pTaskInfo;
__typeof__(gTaskScalarExtra.isTaskKilled) savedIsTaskKilled = gTaskScalarExtra.isTaskKilled;
sclIsTaskKilled savedIsTaskKilled = gTaskScalarExtra.isTaskKilled;

if (pTaskInfo != NULL) {
gTaskScalarExtra.pTaskInfo = pTaskInfo;
Expand Down
206 changes: 166 additions & 40 deletions source/libs/function/src/tudf.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,17 @@ SUdfdData udfdGlobal = {0};
int32_t udfStartUdfd(int32_t startDnodeId);
void udfStopUdfd();

extern char **environ;
/* Note: udfd spawning now uses libuv's portable ``uv_os_environ`` instead
* of the POSIX ``extern char **environ`` symbol so the same code works on
* Windows. */

#ifdef WINDOWS
#define UDF_LIB_PATH_ENV "PATH"
#define UDF_LIB_PATH_SEP ';'
#else
#define UDF_LIB_PATH_ENV "LD_LIBRARY_PATH"
#define UDF_LIB_PATH_SEP ':'
#endif

static int32_t udfSpawnUdfd(SUdfdData *pData);
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal);
Expand Down Expand Up @@ -87,6 +97,11 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {

int32_t err = 0;
uv_process_options_t options = {0};
char *pathTaosdLdLibHeap = NULL;
char *udfdPathLdLib = NULL;
char *ldLibPathEnvItem = NULL;
char *taosFqdnEnvItem = NULL;
char **envUdfdWithPEnv = NULL;

char path[PATH_MAX] = {0};
if (tsProcPath == NULL) {
Expand Down Expand Up @@ -121,10 +136,24 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {

options.exit_cb = udfUdfdExit;

TAOS_UV_LIB_ERROR_RET(uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1));

// Stdio container references &pData->ctrlPipe by pointer; the pointer is
// stable across this function regardless of when the handle is initialized.
// We defer uv_pipe_init until just before uv_spawn (below) so that none of
// the env-construction failure paths above can goto _OVER with an
// already-initialized loop handle that would later trip uv_loop_close.
uv_stdio_container_t child_stdio[3];
// UV_OVERLAPPED_PIPE is required on Windows: without it libuv uses
// CreatePipe() which yields a non-overlapped anonymous pipe, and the
// child's uv_pipe_open(fd 0) fails because libuv's IOCP machinery
// requires FILE_FLAG_OVERLAPPED. On POSIX the same flag is aliased to
// UV_NONBLOCK_PIPE in libuv >=1.49 -- adding it would put the child's
// stdin into non-blocking mode and risk data loss for an unaware child,
// so we only set it on Windows.
#ifdef WINDOWS
child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_OVERLAPPED_PIPE;
Comment thread
stephenkgu marked this conversation as resolved.
#else
child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
#endif
child_stdio[0].data.stream = (uv_stream_t *)&pData->ctrlPipe;
child_stdio[1].flags = UV_IGNORE;
child_stdio[2].flags = UV_INHERIT_FD;
Expand All @@ -146,28 +175,57 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
numCpuCores = TMAX(numCpuCores, 2);
snprintf(thrdPoolSizeEnvItem, 32, "%s=%d", "UV_THREADPOOL_SIZE", (int32_t)numCpuCores * 2);

char pathTaosdLdLib[512] = {0};
size_t taosdLdLibPathLen = sizeof(pathTaosdLdLib);
int32_t ret = uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen);
if (ret != UV_ENOBUFS) {
char pathTaosdLdLibStack[512] = {0};
char *pathTaosdLdLib = pathTaosdLdLibStack;
size_t taosdLdLibPathLen = sizeof(pathTaosdLdLibStack);
int32_t ret = uv_os_getenv(UDF_LIB_PATH_ENV, pathTaosdLdLib, &taosdLdLibPathLen);
if (ret == UV_ENOBUFS) {
// taosdLdLibPathLen now holds the required buffer size (incl. NUL).
pathTaosdLdLibHeap = (char *)taosMemoryCalloc(taosdLdLibPathLen, 1);
if (pathTaosdLdLibHeap == NULL) {
err = terrno;
goto _OVER;
}
pathTaosdLdLib = pathTaosdLdLibHeap;
ret = uv_os_getenv(UDF_LIB_PATH_ENV, pathTaosdLdLib, &taosdLdLibPathLen);
if (ret != 0) {
pathTaosdLdLib[0] = '\0';
taosdLdLibPathLen = 0;
} else {
taosdLdLibPathLen = strlen(pathTaosdLdLib);
}
} else if (ret != 0) {
pathTaosdLdLib[0] = '\0';
taosdLdLibPathLen = 0;
} else {
taosdLdLibPathLen = strlen(pathTaosdLdLib);
}

char udfdPathLdLib[1024] = {0};
size_t udfdLdLibPathLen = strlen(tsUdfdLdLibPath);
tstrncpy(udfdPathLdLib, tsUdfdLdLibPath, sizeof(udfdPathLdLib));

udfdPathLdLib[udfdLdLibPathLen] = ':';
tstrncpy(udfdPathLdLib + udfdLdLibPathLen + 1, pathTaosdLdLib, sizeof(udfdPathLdLib) - udfdLdLibPathLen - 1);
if (udfdLdLibPathLen + taosdLdLibPathLen < 1024) {
fnInfo("udfd LD_LIBRARY_PATH: %s", udfdPathLdLib);
size_t joinedLen = udfdLdLibPathLen + taosdLdLibPathLen + 2; // sep + NUL
udfdPathLdLib = (char *)taosMemoryCalloc(joinedLen, 1);
if (udfdPathLdLib == NULL) {
err = terrno;
goto _OVER;
}
Comment thread
stephenkgu marked this conversation as resolved.
if (udfdLdLibPathLen > 0 && taosdLdLibPathLen > 0) {
snprintf(udfdPathLdLib, joinedLen, "%s%c%s",
tsUdfdLdLibPath, UDF_LIB_PATH_SEP, pathTaosdLdLib);
} else if (udfdLdLibPathLen > 0) {
tstrncpy(udfdPathLdLib, tsUdfdLdLibPath, joinedLen);
} else {
fnError("can not set correct udfd LD_LIBRARY_PATH");
tstrncpy(udfdPathLdLib, pathTaosdLdLib, joinedLen);
}
fnInfo("udfd %s: %s", UDF_LIB_PATH_ENV, udfdPathLdLib);

size_t ldLibEnvLen = strlen(UDF_LIB_PATH_ENV) + 1 /* '=' */ + strlen(udfdPathLdLib) + 1;
ldLibPathEnvItem = (char *)taosMemoryCalloc(ldLibEnvLen, 1);
if (ldLibPathEnvItem == NULL) {
err = terrno;
goto _OVER;
}
char ldLibPathEnvItem[1024 + 32] = {0};
snprintf(ldLibPathEnvItem, 1024 + 32, "%s=%s", "LD_LIBRARY_PATH", udfdPathLdLib);
snprintf(ldLibPathEnvItem, ldLibEnvLen, "%s=%s", UDF_LIB_PATH_ENV, udfdPathLdLib);
Comment thread
stephenkgu marked this conversation as resolved.

char *taosFqdnEnvItem = NULL;
char *taosFqdn = getenv("TAOS_FQDN");
if (taosFqdn != NULL) {
int32_t subLen = strlen(taosFqdn);
Expand All @@ -179,54 +237,107 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
fnInfo("[UDFD]Succsess to set TAOS_FQDN:%s", taosFqdn);
} else {
fnError("[UDFD]Failed to allocate memory for TAOS_FQDN");
return terrno;
err = terrno;
goto _OVER;
}
}

char *envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, ldLibPathEnvItem, taosFqdnEnvItem, NULL};

char **envUdfdWithPEnv = NULL;
if (environ != NULL) {
// Inherit the parent process's environment so spawned udfd sees the same
// PATH / locale / config-related variables. Using libuv's portable
// ``uv_os_environ`` instead of POSIX ``extern char **environ`` lets this
// path work on Windows as well as Linux/macOS.
uv_env_item_t *uvEnvItems = NULL;
int uvEnvItemCount = 0;
int32_t uvEnvErr = uv_os_environ(&uvEnvItems, &uvEnvItemCount);
if (uvEnvErr == 0 && uvEnvItems != NULL && uvEnvItemCount > 0) {
int32_t lenEnvUdfd = ARRAY_SIZE(envUdfd);
int32_t numEnviron = 0;
while (environ[numEnviron] != NULL) {
numEnviron++;
}

envUdfdWithPEnv = (char **)taosMemoryCalloc(numEnviron + lenEnvUdfd, sizeof(char *));
envUdfdWithPEnv = (char **)taosMemoryCalloc(uvEnvItemCount + lenEnvUdfd, sizeof(char *));
Comment thread
stephenkgu marked this conversation as resolved.
if (envUdfdWithPEnv == NULL) {
err = TSDB_CODE_OUT_OF_MEMORY;
uv_os_free_environ(uvEnvItems, uvEnvItemCount);
goto _OVER;
}

for (int32_t i = 0; i < numEnviron; i++) {
int32_t len = strlen(environ[i]) + 1;
envUdfdWithPEnv[i] = (char *)taosMemoryCalloc(len, 1);
if (envUdfdWithPEnv[i] == NULL) {
// Names we'll be appending below. Skip these when copying the inherited
// env so the spawned udfd sees our overrides instead of duplicate keys.
// On Windows env-var names are case-insensitive (notably PATH); POSIX is
// case-sensitive. osDef.h maps strncasecmp -> _strnicmp on Windows.
#ifdef WINDOWS
int (*envNameCmp)(const char *, const char *, size_t) = strncasecmp;
#else
int (*envNameCmp)(const char *, const char *, size_t) = strncmp;
#endif
const char *overrideNames[] = {"DNODE_ID", "UV_THREADPOOL_SIZE", UDF_LIB_PATH_ENV, "TAOS_FQDN"};
size_t overrideNameLens[ARRAY_SIZE(overrideNames)];
for (size_t i = 0; i < ARRAY_SIZE(overrideNames); i++) {
overrideNameLens[i] = strlen(overrideNames[i]);
}

int32_t outIdx = 0;
for (int i = 0; i < uvEnvItemCount; i++) {
const char *name = uvEnvItems[i].name;
size_t nameLen = strlen(name);
bool skip = false;
for (size_t k = 0; k < ARRAY_SIZE(overrideNames); k++) {
if (nameLen == overrideNameLens[k] && envNameCmp(name, overrideNames[k], nameLen) == 0) {
skip = true;
break;
}
}
if (skip) continue;
size_t valueLen = strlen(uvEnvItems[i].value);
size_t len = nameLen + 1 /* '=' */ + valueLen + 1 /* '\0' */;
envUdfdWithPEnv[outIdx] = (char *)taosMemoryCalloc(len, 1);
if (envUdfdWithPEnv[outIdx] == NULL) {
err = TSDB_CODE_OUT_OF_MEMORY;
uv_os_free_environ(uvEnvItems, uvEnvItemCount);
goto _OVER;
}

tstrncpy(envUdfdWithPEnv[i], environ[i], len);
snprintf(envUdfdWithPEnv[outIdx], len, "%s=%s", name, uvEnvItems[i].value);
outIdx++;
}
uv_os_free_environ(uvEnvItems, uvEnvItemCount);
uvEnvItems = NULL;

for (int32_t i = 0; i < lenEnvUdfd; i++) {
if (envUdfd[i] != NULL) {
int32_t len = strlen(envUdfd[i]) + 1;
envUdfdWithPEnv[numEnviron + i] = (char *)taosMemoryCalloc(len, 1);
if (envUdfdWithPEnv[numEnviron + i] == NULL) {
size_t len = strlen(envUdfd[i]) + 1;
envUdfdWithPEnv[outIdx] = (char *)taosMemoryCalloc(len, 1);
if (envUdfdWithPEnv[outIdx] == NULL) {
Comment thread
stephenkgu marked this conversation as resolved.
err = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}

tstrncpy(envUdfdWithPEnv[numEnviron + i], envUdfd[i], len);
tstrncpy(envUdfdWithPEnv[outIdx], envUdfd[i], len);
outIdx++;
}
}
envUdfdWithPEnv[numEnviron + lenEnvUdfd - 1] = NULL;
envUdfdWithPEnv[outIdx] = NULL;

options.env = envUdfdWithPEnv;
} else {
options.env = envUdfd;
if (uvEnvItems != NULL) {
uv_os_free_environ(uvEnvItems, uvEnvItemCount);
}
// uv_os_environ failure is extremely rare; if it happens, fail fast rather
// than spawn udfd with a minimal env that drops PATH/locale and could lead
// to silently wrong behavior (e.g. loading the wrong .so).
fnError("failed to read process environment for udfd: %s",
uvEnvErr != 0 ? uv_strerror(uvEnvErr) : "empty environment");
err = uvEnvErr != 0 ? uvEnvErr : TSDB_CODE_FAILED;
goto _OVER;
Comment thread
stephenkgu marked this conversation as resolved.
}

// ipc=0: this is a one-way control pipe used only to detect parent death
// via read EOF. ipc=1 forces a libuv pid-handshake on Windows that fails
// for child-stdio pipes. Initialized here, after env construction, so any
// failure path above can goto _OVER without a stranded loop handle.
int32_t pipeInitErr = uv_pipe_init(&pData->loop, &pData->ctrlPipe, 0);
if (pipeInitErr != 0) {
fnError("uv_pipe_init failed: %s", uv_strerror(pipeInitErr));
err = TSDB_CODE_UDF_UV_EXEC_FAILURE;
goto _OVER;
Comment on lines +336 to +340
}

err = uv_spawn(&pData->loop, &pData->process, &options);
Expand Down Expand Up @@ -258,9 +369,18 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
}

_OVER:
if (pathTaosdLdLibHeap) {
taosMemoryFree(pathTaosdLdLibHeap);
}
if (udfdPathLdLib) {
taosMemoryFree(udfdPathLdLib);
}
if (taosFqdnEnvItem) {
taosMemoryFree(taosFqdnEnvItem);
}
if (ldLibPathEnvItem) {
taosMemoryFree(ldLibPathEnvItem);
}

if (envUdfdWithPEnv != NULL) {
int32_t i = 0;
Expand Down Expand Up @@ -311,6 +431,12 @@ static void udfWatchUdfd(void *args) {
if (terrno != 0) {
(void)uv_barrier_wait(&pData->barrier);
atomic_store_32(&pData->spawnErr, terrno);
// Any handle initialized before failure (stopAsync, ctrlPipe from a
// partial udfSpawnUdfd) must be closed and drained, otherwise
// uv_loop_close fails and leaks the loop's internal state.
uv_walk(&pData->loop, udfUdfdCloseWalkCb, NULL);
int32_t num = uv_run(&pData->loop, UV_RUN_DEFAULT);
fnInfo("udfd loop exit with %d active handles, line:%d", num, __LINE__);
if (uv_loop_close(&pData->loop) != 0) {
fnError("udfd loop close failed, lino:%d", __LINE__);
}
Expand Down
Loading
Loading