Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/en/14-reference/01-components/01-taosd.md
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ The effective value of charset is UTF-8.
| mqRebalanceInterval | | Supported, effective immediately | Internal parameter, interval for consumer rebalancing |
| uptimeInterval | | Supported, effective immediately | Internal parameter, for recording system uptime |
| timeseriesThreshold | | Supported, effective immediately | Internal parameter, for usage statistics |
| udf | | Supported, effective after restart | Whether to start UDF service; 0: do not start, 1: start; default value 1(The default value on Windows is 0.) |
| udf | | Supported, effective after restart | Whether to start UDF service; 0: do not start, 1: start; default value 1 |
| udfdResFuncs | | Supported, effective after restart | Internal parameter, for setting UDF result sets|
| udfdLdLibPath | | Supported, effective after restart | Internal parameter, indicates the library path for loading UDF |
| streamBatchRequestWaitMs | | Supported, effective after restart | Stream computing batch request wait time, range 0-1800000, in milliseconds, default value 5000 |
Expand Down
2 changes: 1 addition & 1 deletion docs/zh/14-reference/01-components/01-taosd.md
Original file line number Diff line number Diff line change
Expand Up @@ -1857,7 +1857,7 @@ charset 的有效值是 UTF-8。

- 说明:是否启动 UDF 服务
- 类型:整数;0:不启动,1:启动。
- 默认值:1(windows 上默认值为 0)
- 默认值:1
- 最小值:0
- 最大值:1
- 参数类型:全局配置参数
Expand Down
6 changes: 3 additions & 3 deletions source/common/src/tglobal.c
Original file line number Diff line number Diff line change
Expand Up @@ -442,10 +442,10 @@ bool tsIfAdtFse = false; // ADT-FSE algorithom or origina
char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR

// udf
#if defined(WINDOWS) || !defined(USE_UDF)
bool tsStartUdfd = false;
#else
#ifdef USE_UDF
bool tsStartUdfd = true;
#else
bool tsStartUdfd = false;
#endif
Comment thread
stephenkgu marked this conversation as resolved.

Comment thread
stephenkgu marked this conversation as resolved.
// wal
Expand Down
4 changes: 0 additions & 4 deletions source/dnode/mnode/impl/src/mndFunc.c
Original file line number Diff line number Diff line change
Expand Up @@ -427,10 +427,6 @@ static int32_t mndProcessCreateFuncReq(SRpcMsg *pReq) {

TAOS_CHECK_GOTO(tDeserializeSCreateFuncReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);

#ifdef WINDOWS
code = TSDB_CODE_MND_INVALID_PLATFORM;
goto _OVER;
#endif
mInfo("func:%s, start to create, size:%d", createReq.name, createReq.codeLen);
TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CREATE_FUNC), NULL, _OVER);

Expand Down
2 changes: 1 addition & 1 deletion source/libs/executor/src/projectoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -1359,7 +1359,7 @@ int32_t projectApplyFunctionsWithSelect(SExprInfo* pExpr, SSDataBlock* pResult,
int32_t code = TSDB_CODE_SUCCESS;

SExecTaskInfo* savedTaskInfo = gTaskScalarExtra.pTaskInfo;
__typeof__(gTaskScalarExtra.isTaskKilled) savedIsTaskKilled = gTaskScalarExtra.isTaskKilled;
sclIsTaskKilled savedIsTaskKilled = gTaskScalarExtra.isTaskKilled;

if (pTaskInfo != NULL) {
gTaskScalarExtra.pTaskInfo = pTaskInfo;
Expand Down
182 changes: 142 additions & 40 deletions source/libs/function/src/tudf.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,17 @@ SUdfdData udfdGlobal = {0};
int32_t udfStartUdfd(int32_t startDnodeId);
void udfStopUdfd();

extern char **environ;
/* Note: udfd spawning now uses libuv's portable ``uv_os_environ`` instead
* of the POSIX ``extern char **environ`` symbol so the same code works on
* Windows. */

#ifdef WINDOWS
#define UDF_LIB_PATH_ENV "PATH"
#define UDF_LIB_PATH_SEP ';'
#else
#define UDF_LIB_PATH_ENV "LD_LIBRARY_PATH"
#define UDF_LIB_PATH_SEP ':'
#endif

static int32_t udfSpawnUdfd(SUdfdData *pData);
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal);
Expand Down Expand Up @@ -87,6 +97,11 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {

int32_t err = 0;
uv_process_options_t options = {0};
char *pathTaosdLdLibHeap = NULL;
char *udfdPathLdLib = NULL;
char *ldLibPathEnvItem = NULL;
char *taosFqdnEnvItem = NULL;
char **envUdfdWithPEnv = NULL;

char path[PATH_MAX] = {0};
if (tsProcPath == NULL) {
Expand Down Expand Up @@ -121,10 +136,17 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {

options.exit_cb = udfUdfdExit;

TAOS_UV_LIB_ERROR_RET(uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1));
// ipc=0: this is a one-way control pipe used only to detect parent
// death via read EOF. No handle-passing is needed. ipc=1 forces a
// libuv pid-handshake on Windows that fails for child-stdio pipes.
TAOS_UV_LIB_ERROR_RET(uv_pipe_init(&pData->loop, &pData->ctrlPipe, 0));

uv_stdio_container_t child_stdio[3];
child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
// UV_OVERLAPPED_PIPE is required on Windows: without it libuv uses
// CreatePipe() which yields a non-overlapped anonymous pipe, and the
// child's uv_pipe_open(fd 0) fails because libuv's IOCP machinery
// requires FILE_FLAG_OVERLAPPED. The flag is a no-op on POSIX.
child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_OVERLAPPED_PIPE;
Comment thread
stephenkgu marked this conversation as resolved.
child_stdio[0].data.stream = (uv_stream_t *)&pData->ctrlPipe;
child_stdio[1].flags = UV_IGNORE;
child_stdio[2].flags = UV_INHERIT_FD;
Expand All @@ -146,28 +168,57 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
numCpuCores = TMAX(numCpuCores, 2);
snprintf(thrdPoolSizeEnvItem, 32, "%s=%d", "UV_THREADPOOL_SIZE", (int32_t)numCpuCores * 2);

char pathTaosdLdLib[512] = {0};
size_t taosdLdLibPathLen = sizeof(pathTaosdLdLib);
int32_t ret = uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen);
if (ret != UV_ENOBUFS) {
char pathTaosdLdLibStack[512] = {0};
char *pathTaosdLdLib = pathTaosdLdLibStack;
size_t taosdLdLibPathLen = sizeof(pathTaosdLdLibStack);
int32_t ret = uv_os_getenv(UDF_LIB_PATH_ENV, pathTaosdLdLib, &taosdLdLibPathLen);
if (ret == UV_ENOBUFS) {
// taosdLdLibPathLen now holds the required buffer size (incl. NUL).
pathTaosdLdLibHeap = (char *)taosMemoryCalloc(taosdLdLibPathLen, 1);
if (pathTaosdLdLibHeap == NULL) {
err = terrno;
goto _OVER;
}
pathTaosdLdLib = pathTaosdLdLibHeap;
ret = uv_os_getenv(UDF_LIB_PATH_ENV, pathTaosdLdLib, &taosdLdLibPathLen);
if (ret != 0) {
pathTaosdLdLib[0] = '\0';
taosdLdLibPathLen = 0;
} else {
taosdLdLibPathLen = strlen(pathTaosdLdLib);
}
} else if (ret != 0) {
pathTaosdLdLib[0] = '\0';
taosdLdLibPathLen = 0;
} else {
taosdLdLibPathLen = strlen(pathTaosdLdLib);
}

char udfdPathLdLib[1024] = {0};
size_t udfdLdLibPathLen = strlen(tsUdfdLdLibPath);
tstrncpy(udfdPathLdLib, tsUdfdLdLibPath, sizeof(udfdPathLdLib));

udfdPathLdLib[udfdLdLibPathLen] = ':';
tstrncpy(udfdPathLdLib + udfdLdLibPathLen + 1, pathTaosdLdLib, sizeof(udfdPathLdLib) - udfdLdLibPathLen - 1);
if (udfdLdLibPathLen + taosdLdLibPathLen < 1024) {
fnInfo("udfd LD_LIBRARY_PATH: %s", udfdPathLdLib);
size_t joinedLen = udfdLdLibPathLen + taosdLdLibPathLen + 2; // sep + NUL
udfdPathLdLib = (char *)taosMemoryCalloc(joinedLen, 1);
if (udfdPathLdLib == NULL) {
err = terrno;
goto _OVER;
}
Comment thread
stephenkgu marked this conversation as resolved.
if (udfdLdLibPathLen > 0 && taosdLdLibPathLen > 0) {
snprintf(udfdPathLdLib, joinedLen, "%s%c%s",
tsUdfdLdLibPath, UDF_LIB_PATH_SEP, pathTaosdLdLib);
} else if (udfdLdLibPathLen > 0) {
tstrncpy(udfdPathLdLib, tsUdfdLdLibPath, joinedLen);
} else {
fnError("can not set correct udfd LD_LIBRARY_PATH");
tstrncpy(udfdPathLdLib, pathTaosdLdLib, joinedLen);
}
fnInfo("udfd %s: %s", UDF_LIB_PATH_ENV, udfdPathLdLib);

size_t ldLibEnvLen = strlen(UDF_LIB_PATH_ENV) + 1 /* '=' */ + strlen(udfdPathLdLib) + 1;
ldLibPathEnvItem = (char *)taosMemoryCalloc(ldLibEnvLen, 1);
if (ldLibPathEnvItem == NULL) {
err = terrno;
goto _OVER;
}
char ldLibPathEnvItem[1024 + 32] = {0};
snprintf(ldLibPathEnvItem, 1024 + 32, "%s=%s", "LD_LIBRARY_PATH", udfdPathLdLib);
snprintf(ldLibPathEnvItem, ldLibEnvLen, "%s=%s", UDF_LIB_PATH_ENV, udfdPathLdLib);
Comment thread
stephenkgu marked this conversation as resolved.

char *taosFqdnEnvItem = NULL;
char *taosFqdn = getenv("TAOS_FQDN");
if (taosFqdn != NULL) {
int32_t subLen = strlen(taosFqdn);
Expand All @@ -179,54 +230,96 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
fnInfo("[UDFD]Succsess to set TAOS_FQDN:%s", taosFqdn);
} else {
fnError("[UDFD]Failed to allocate memory for TAOS_FQDN");
return terrno;
err = terrno;
goto _OVER;
}
}

char *envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, ldLibPathEnvItem, taosFqdnEnvItem, NULL};

char **envUdfdWithPEnv = NULL;
if (environ != NULL) {
// Inherit the parent process's environment so spawned udfd sees the same
// PATH / locale / config-related variables. Using libuv's portable
// ``uv_os_environ`` instead of POSIX ``extern char **environ`` lets this
// path work on Windows as well as Linux/macOS.
uv_env_item_t *uvEnvItems = NULL;
int uvEnvItemCount = 0;
int32_t uvEnvErr = uv_os_environ(&uvEnvItems, &uvEnvItemCount);
if (uvEnvErr == 0 && uvEnvItems != NULL && uvEnvItemCount > 0) {
int32_t lenEnvUdfd = ARRAY_SIZE(envUdfd);
int32_t numEnviron = 0;
while (environ[numEnviron] != NULL) {
numEnviron++;
}

envUdfdWithPEnv = (char **)taosMemoryCalloc(numEnviron + lenEnvUdfd, sizeof(char *));
envUdfdWithPEnv = (char **)taosMemoryCalloc(uvEnvItemCount + lenEnvUdfd, sizeof(char *));
Comment thread
stephenkgu marked this conversation as resolved.
if (envUdfdWithPEnv == NULL) {
err = TSDB_CODE_OUT_OF_MEMORY;
uv_os_free_environ(uvEnvItems, uvEnvItemCount);
goto _OVER;
}

for (int32_t i = 0; i < numEnviron; i++) {
int32_t len = strlen(environ[i]) + 1;
envUdfdWithPEnv[i] = (char *)taosMemoryCalloc(len, 1);
if (envUdfdWithPEnv[i] == NULL) {
// Names we'll be appending below. Skip these when copying the inherited
// env so the spawned udfd sees our overrides instead of duplicate keys.
// On Windows env-var names are case-insensitive (notably PATH); POSIX is
// case-sensitive. osDef.h maps strncasecmp -> _strnicmp on Windows.
#ifdef WINDOWS
int (*envNameCmp)(const char *, const char *, size_t) = strncasecmp;
Comment thread
stephenkgu marked this conversation as resolved.
#else
int (*envNameCmp)(const char *, const char *, size_t) = strncmp;
#endif
const char *overrideNames[] = {"DNODE_ID", "UV_THREADPOOL_SIZE", UDF_LIB_PATH_ENV, "TAOS_FQDN"};
size_t overrideNameLens[ARRAY_SIZE(overrideNames)];
for (size_t i = 0; i < ARRAY_SIZE(overrideNames); i++) {
overrideNameLens[i] = strlen(overrideNames[i]);
}

int32_t outIdx = 0;
for (int i = 0; i < uvEnvItemCount; i++) {
const char *name = uvEnvItems[i].name;
size_t nameLen = strlen(name);
bool skip = false;
for (size_t k = 0; k < ARRAY_SIZE(overrideNames); k++) {
if (nameLen == overrideNameLens[k] && envNameCmp(name, overrideNames[k], nameLen) == 0) {
skip = true;
break;
}
}
if (skip) continue;
size_t valueLen = strlen(uvEnvItems[i].value);
size_t len = nameLen + 1 /* '=' */ + valueLen + 1 /* '\0' */;
envUdfdWithPEnv[outIdx] = (char *)taosMemoryCalloc(len, 1);
if (envUdfdWithPEnv[outIdx] == NULL) {
err = TSDB_CODE_OUT_OF_MEMORY;
uv_os_free_environ(uvEnvItems, uvEnvItemCount);
goto _OVER;
}

tstrncpy(envUdfdWithPEnv[i], environ[i], len);
snprintf(envUdfdWithPEnv[outIdx], len, "%s=%s", name, uvEnvItems[i].value);
outIdx++;
}
uv_os_free_environ(uvEnvItems, uvEnvItemCount);
uvEnvItems = NULL;

for (int32_t i = 0; i < lenEnvUdfd; i++) {
if (envUdfd[i] != NULL) {
int32_t len = strlen(envUdfd[i]) + 1;
envUdfdWithPEnv[numEnviron + i] = (char *)taosMemoryCalloc(len, 1);
if (envUdfdWithPEnv[numEnviron + i] == NULL) {
size_t len = strlen(envUdfd[i]) + 1;
envUdfdWithPEnv[outIdx] = (char *)taosMemoryCalloc(len, 1);
if (envUdfdWithPEnv[outIdx] == NULL) {
Comment thread
stephenkgu marked this conversation as resolved.
err = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}

tstrncpy(envUdfdWithPEnv[numEnviron + i], envUdfd[i], len);
tstrncpy(envUdfdWithPEnv[outIdx], envUdfd[i], len);
outIdx++;
}
}
envUdfdWithPEnv[numEnviron + lenEnvUdfd - 1] = NULL;
envUdfdWithPEnv[outIdx] = NULL;

options.env = envUdfdWithPEnv;
} else {
options.env = envUdfd;
if (uvEnvItems != NULL) {
uv_os_free_environ(uvEnvItems, uvEnvItemCount);
}
// uv_os_environ failure is extremely rare; if it happens, fail fast rather
// than spawn udfd with a minimal env that drops PATH/locale and could lead
// to silently wrong behavior (e.g. loading the wrong .so).
fnError("failed to read process environment for udfd: %s",
uvEnvErr != 0 ? uv_strerror(uvEnvErr) : "empty environment");
err = uvEnvErr != 0 ? uvEnvErr : TSDB_CODE_FAILED;
goto _OVER;
Comment thread
stephenkgu marked this conversation as resolved.
}

err = uv_spawn(&pData->loop, &pData->process, &options);
Expand Down Expand Up @@ -258,9 +351,18 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
}

_OVER:
if (pathTaosdLdLibHeap) {
taosMemoryFree(pathTaosdLdLibHeap);
}
if (udfdPathLdLib) {
taosMemoryFree(udfdPathLdLib);
}
if (taosFqdnEnvItem) {
taosMemoryFree(taosFqdnEnvItem);
}
if (ldLibPathEnvItem) {
taosMemoryFree(ldLibPathEnvItem);
}

if (envUdfdWithPEnv != NULL) {
int32_t i = 0;
Expand Down
29 changes: 18 additions & 11 deletions source/libs/function/src/udfd.c
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,13 @@ int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[],
int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) {
TAOS_UDF_CHECK_PTR_RCODE(plugin);
plugin->scriptType = TSDB_FUNC_SCRIPT_PYTHON;
// todo: windows support
#ifdef WINDOWS
snprintf(plugin->libPath, PATH_MAX, "%s", "taospyudf.dll");
Comment on lines +404 to +405
#elif defined(_TD_DARWIN_64)
snprintf(plugin->libPath, PATH_MAX, "%s", "libtaospyudf.dylib");
#else
snprintf(plugin->libPath, PATH_MAX, "%s", "libtaospyudf.so");
#endif
plugin->libLoaded = false;
const char *funcName[UDFD_MAX_PLUGIN_FUNCS] = {"pyOpen", "pyClose", "pyUdfInit",
"pyUdfDestroy", "pyUdfScalarProc", "pyUdfAggStart",
Expand Down Expand Up @@ -1058,22 +1063,22 @@ void udfdGetFuncBodyPath(const SUdf *udf, char *path) {
TAOS_UDF_CHECK_PTR_RVOID(udf, path);
if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) {
#ifdef WINDOWS
snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".dll", global.udfDataDir, udf->name, udf->version, udf->createdTime);
snprintf(path, PATH_MAX, "%s" TD_DIRSEP "%s_%d_%" PRIx64 ".dll", global.udfDataDir, udf->name, udf->version,
udf->createdTime);
#else
Comment thread
stephenkgu marked this conversation as resolved.
snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", global.udfDataDir, udf->name, udf->version,
snprintf(path, PATH_MAX, "%s" TD_DIRSEP "lib%s_%d_%" PRIx64 ".so", global.udfDataDir, udf->name, udf->version,
udf->createdTime);
#endif
} else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) {
#ifdef WINDOWS
snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime);
#else
snprintf(path, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime);
#endif
snprintf(path, PATH_MAX, "%s" TD_DIRSEP "%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version,
udf->createdTime);
} else {
#ifdef WINDOWS
snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime);
snprintf(path, PATH_MAX, "%s" TD_DIRSEP "%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version,
udf->createdTime);
#else
Comment thread
stephenkgu marked this conversation as resolved.
snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime);
snprintf(path, PATH_MAX, "%s" TD_DIRSEP "lib%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version,
udf->createdTime);
#endif
}
}
Expand Down Expand Up @@ -1633,7 +1638,9 @@ static int32_t udfdUvInit() {
TAOS_CHECK_RETURN(uv_loop_init(global.loop));

if (tsStartUdfd) { // udfd is started by taosd, which shall exit when taosd exit
TAOS_CHECK_RETURN(uv_pipe_init(global.loop, &global.ctrlPipe, 1));
// ipc=0 — see tudf.c counterpart. Control pipe is used only for
// parent-death EOF; ipc=1 fails uv_pipe_open on Windows.
TAOS_CHECK_RETURN(uv_pipe_init(global.loop, &global.ctrlPipe, 0));
TAOS_CHECK_RETURN(uv_pipe_open(&global.ctrlPipe, 0));
TAOS_CHECK_RETURN(uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb));
}
Expand Down
Loading