Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions source/common/src/tglobal.c
Original file line number Diff line number Diff line change
Expand Up @@ -442,10 +442,10 @@ bool tsIfAdtFse = false; // ADT-FSE algorithom or origina
char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR

// udf
#if defined(WINDOWS) || !defined(USE_UDF)
bool tsStartUdfd = false;
#else
#ifdef USE_UDF
bool tsStartUdfd = true;
#else
bool tsStartUdfd = false;
#endif
Comment thread
stephenkgu marked this conversation as resolved.

Comment thread
stephenkgu marked this conversation as resolved.
// wal
Expand Down
4 changes: 0 additions & 4 deletions source/dnode/mnode/impl/src/mndFunc.c
Original file line number Diff line number Diff line change
Expand Up @@ -427,10 +427,6 @@ static int32_t mndProcessCreateFuncReq(SRpcMsg *pReq) {

TAOS_CHECK_GOTO(tDeserializeSCreateFuncReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);

#ifdef WINDOWS
code = TSDB_CODE_MND_INVALID_PLATFORM;
goto _OVER;
#endif
mInfo("func:%s, start to create, size:%d", createReq.name, createReq.codeLen);
TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CREATE_FUNC), NULL, _OVER);

Expand Down
2 changes: 1 addition & 1 deletion source/libs/executor/src/projectoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -1359,7 +1359,7 @@ int32_t projectApplyFunctionsWithSelect(SExprInfo* pExpr, SSDataBlock* pResult,
int32_t code = TSDB_CODE_SUCCESS;

SExecTaskInfo* savedTaskInfo = gTaskScalarExtra.pTaskInfo;
__typeof__(gTaskScalarExtra.isTaskKilled) savedIsTaskKilled = gTaskScalarExtra.isTaskKilled;
sclIsTaskKilled savedIsTaskKilled = gTaskScalarExtra.isTaskKilled;

if (pTaskInfo != NULL) {
gTaskScalarExtra.pTaskInfo = pTaskInfo;
Expand Down
171 changes: 133 additions & 38 deletions source/libs/function/src/tudf.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,17 @@ SUdfdData udfdGlobal = {0};
int32_t udfStartUdfd(int32_t startDnodeId);
void udfStopUdfd();

extern char **environ;
/* Note: udfd spawning now uses libuv's portable ``uv_os_environ`` instead
* of the POSIX ``extern char **environ`` symbol so the same code works on
* Windows. */

#ifdef WINDOWS
#define UDF_LIB_PATH_ENV "PATH"
#define UDF_LIB_PATH_SEP ';'
#else
#define UDF_LIB_PATH_ENV "LD_LIBRARY_PATH"
#define UDF_LIB_PATH_SEP ':'
#endif

static int32_t udfSpawnUdfd(SUdfdData *pData);
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal);
Expand Down Expand Up @@ -87,6 +97,11 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {

int32_t err = 0;
uv_process_options_t options = {0};
char *pathTaosdLdLibHeap = NULL;
char *udfdPathLdLib = NULL;
char *ldLibPathEnvItem = NULL;
char *taosFqdnEnvItem = NULL;
char **envUdfdWithPEnv = NULL;

char path[PATH_MAX] = {0};
if (tsProcPath == NULL) {
Expand Down Expand Up @@ -121,10 +136,17 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {

options.exit_cb = udfUdfdExit;

TAOS_UV_LIB_ERROR_RET(uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1));
// ipc=0: this is a one-way control pipe used only to detect parent
// death via read EOF. No handle-passing is needed. ipc=1 forces a
// libuv pid-handshake on Windows that fails for child-stdio pipes.
TAOS_UV_LIB_ERROR_RET(uv_pipe_init(&pData->loop, &pData->ctrlPipe, 0));

uv_stdio_container_t child_stdio[3];
child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
// UV_OVERLAPPED_PIPE is required on Windows: without it libuv uses
// CreatePipe() which yields a non-overlapped anonymous pipe, and the
// child's uv_pipe_open(fd 0) fails because libuv's IOCP machinery
// requires FILE_FLAG_OVERLAPPED. The flag is a no-op on POSIX.
child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_OVERLAPPED_PIPE;
Comment thread
stephenkgu marked this conversation as resolved.
child_stdio[0].data.stream = (uv_stream_t *)&pData->ctrlPipe;
child_stdio[1].flags = UV_IGNORE;
child_stdio[2].flags = UV_INHERIT_FD;
Expand All @@ -146,28 +168,57 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
numCpuCores = TMAX(numCpuCores, 2);
snprintf(thrdPoolSizeEnvItem, 32, "%s=%d", "UV_THREADPOOL_SIZE", (int32_t)numCpuCores * 2);

char pathTaosdLdLib[512] = {0};
size_t taosdLdLibPathLen = sizeof(pathTaosdLdLib);
int32_t ret = uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen);
if (ret != UV_ENOBUFS) {
char pathTaosdLdLibStack[512] = {0};
char *pathTaosdLdLib = pathTaosdLdLibStack;
size_t taosdLdLibPathLen = sizeof(pathTaosdLdLibStack);
int32_t ret = uv_os_getenv(UDF_LIB_PATH_ENV, pathTaosdLdLib, &taosdLdLibPathLen);
if (ret == UV_ENOBUFS) {
// taosdLdLibPathLen now holds the required buffer size (incl. NUL).
pathTaosdLdLibHeap = (char *)taosMemoryCalloc(taosdLdLibPathLen, 1);
if (pathTaosdLdLibHeap == NULL) {
err = terrno;
goto _OVER;
}
pathTaosdLdLib = pathTaosdLdLibHeap;
ret = uv_os_getenv(UDF_LIB_PATH_ENV, pathTaosdLdLib, &taosdLdLibPathLen);
if (ret != 0) {
pathTaosdLdLib[0] = '\0';
taosdLdLibPathLen = 0;
} else {
taosdLdLibPathLen = strlen(pathTaosdLdLib);
}
} else if (ret != 0) {
pathTaosdLdLib[0] = '\0';
taosdLdLibPathLen = 0;
} else {
taosdLdLibPathLen = strlen(pathTaosdLdLib);
}

char udfdPathLdLib[1024] = {0};
size_t udfdLdLibPathLen = strlen(tsUdfdLdLibPath);
tstrncpy(udfdPathLdLib, tsUdfdLdLibPath, sizeof(udfdPathLdLib));

udfdPathLdLib[udfdLdLibPathLen] = ':';
tstrncpy(udfdPathLdLib + udfdLdLibPathLen + 1, pathTaosdLdLib, sizeof(udfdPathLdLib) - udfdLdLibPathLen - 1);
if (udfdLdLibPathLen + taosdLdLibPathLen < 1024) {
fnInfo("udfd LD_LIBRARY_PATH: %s", udfdPathLdLib);
size_t joinedLen = udfdLdLibPathLen + taosdLdLibPathLen + 2; // sep + NUL
udfdPathLdLib = (char *)taosMemoryCalloc(joinedLen, 1);
if (udfdPathLdLib == NULL) {
err = terrno;
goto _OVER;
}
Comment thread
stephenkgu marked this conversation as resolved.
if (udfdLdLibPathLen > 0 && taosdLdLibPathLen > 0) {
snprintf(udfdPathLdLib, joinedLen, "%s%c%s",
tsUdfdLdLibPath, UDF_LIB_PATH_SEP, pathTaosdLdLib);
} else if (udfdLdLibPathLen > 0) {
tstrncpy(udfdPathLdLib, tsUdfdLdLibPath, joinedLen);
} else {
fnError("can not set correct udfd LD_LIBRARY_PATH");
tstrncpy(udfdPathLdLib, pathTaosdLdLib, joinedLen);
}
char ldLibPathEnvItem[1024 + 32] = {0};
snprintf(ldLibPathEnvItem, 1024 + 32, "%s=%s", "LD_LIBRARY_PATH", udfdPathLdLib);
fnInfo("udfd %s: %s", UDF_LIB_PATH_ENV, udfdPathLdLib);

size_t ldLibEnvLen = strlen(UDF_LIB_PATH_ENV) + 1 /* '=' */ + strlen(udfdPathLdLib) + 1;
ldLibPathEnvItem = (char *)taosMemoryCalloc(ldLibEnvLen, 1);
if (ldLibPathEnvItem == NULL) {
err = terrno;
goto _OVER;
}
snprintf(ldLibPathEnvItem, ldLibEnvLen, "%s=%s", UDF_LIB_PATH_ENV, udfdPathLdLib);
Comment thread
stephenkgu marked this conversation as resolved.

char *taosFqdnEnvItem = NULL;
char *taosFqdn = getenv("TAOS_FQDN");
if (taosFqdn != NULL) {
int32_t subLen = strlen(taosFqdn);
Expand All @@ -185,47 +236,82 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {

char *envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, ldLibPathEnvItem, taosFqdnEnvItem, NULL};

char **envUdfdWithPEnv = NULL;
if (environ != NULL) {
// Inherit the parent process's environment so spawned udfd sees the same
// PATH / locale / config-related variables. Using libuv's portable
// ``uv_os_environ`` instead of POSIX ``extern char **environ`` lets this
// path work on Windows as well as Linux/macOS.
uv_env_item_t *uvEnvItems = NULL;
int uvEnvItemCount = 0;
int32_t uvEnvErr = uv_os_environ(&uvEnvItems, &uvEnvItemCount);
if (uvEnvErr == 0 && uvEnvItems != NULL && uvEnvItemCount > 0) {
int32_t lenEnvUdfd = ARRAY_SIZE(envUdfd);
int32_t numEnviron = 0;
while (environ[numEnviron] != NULL) {
numEnviron++;
}

envUdfdWithPEnv = (char **)taosMemoryCalloc(numEnviron + lenEnvUdfd, sizeof(char *));
envUdfdWithPEnv = (char **)taosMemoryCalloc(uvEnvItemCount + lenEnvUdfd, sizeof(char *));
Comment thread
stephenkgu marked this conversation as resolved.
if (envUdfdWithPEnv == NULL) {
err = TSDB_CODE_OUT_OF_MEMORY;
uv_os_free_environ(uvEnvItems, uvEnvItemCount);
goto _OVER;
}

for (int32_t i = 0; i < numEnviron; i++) {
int32_t len = strlen(environ[i]) + 1;
envUdfdWithPEnv[i] = (char *)taosMemoryCalloc(len, 1);
if (envUdfdWithPEnv[i] == NULL) {
// Names we'll be appending below. Skip these when copying the inherited
// env so the spawned udfd sees our overrides instead of duplicate keys.
// On Windows env-var names are case-insensitive (notably PATH); POSIX is
// case-sensitive.
#ifdef WINDOWS
int (*envNameCmp)(const char *, const char *, size_t) = _strnicmp;
Comment thread
stephenkgu marked this conversation as resolved.
Outdated
#else
int (*envNameCmp)(const char *, const char *, size_t) = strncmp;
#endif
const char *overrideNames[] = {"DNODE_ID", "UV_THREADPOOL_SIZE", UDF_LIB_PATH_ENV, "TAOS_FQDN"};
size_t overrideNameLens[ARRAY_SIZE(overrideNames)];
for (size_t i = 0; i < ARRAY_SIZE(overrideNames); i++) {
overrideNameLens[i] = strlen(overrideNames[i]);
}

int32_t outIdx = 0;
for (int i = 0; i < uvEnvItemCount; i++) {
const char *name = uvEnvItems[i].name;
size_t nameLen = strlen(name);
bool skip = false;
for (size_t k = 0; k < ARRAY_SIZE(overrideNames); k++) {
if (nameLen == overrideNameLens[k] && envNameCmp(name, overrideNames[k], nameLen) == 0) {
skip = true;
break;
}
}
if (skip) continue;
size_t valueLen = strlen(uvEnvItems[i].value);
size_t len = nameLen + 1 /* '=' */ + valueLen + 1 /* '\0' */;
envUdfdWithPEnv[outIdx] = (char *)taosMemoryCalloc(len, 1);
if (envUdfdWithPEnv[outIdx] == NULL) {
err = TSDB_CODE_OUT_OF_MEMORY;
uv_os_free_environ(uvEnvItems, uvEnvItemCount);
goto _OVER;
}

tstrncpy(envUdfdWithPEnv[i], environ[i], len);
snprintf(envUdfdWithPEnv[outIdx], len, "%s=%s", name, uvEnvItems[i].value);
outIdx++;
}
uv_os_free_environ(uvEnvItems, uvEnvItemCount);
uvEnvItems = NULL;

for (int32_t i = 0; i < lenEnvUdfd; i++) {
if (envUdfd[i] != NULL) {
int32_t len = strlen(envUdfd[i]) + 1;
envUdfdWithPEnv[numEnviron + i] = (char *)taosMemoryCalloc(len, 1);
if (envUdfdWithPEnv[numEnviron + i] == NULL) {
size_t len = strlen(envUdfd[i]) + 1;
envUdfdWithPEnv[outIdx] = (char *)taosMemoryCalloc(len, 1);
if (envUdfdWithPEnv[outIdx] == NULL) {
Comment thread
stephenkgu marked this conversation as resolved.
err = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}

tstrncpy(envUdfdWithPEnv[numEnviron + i], envUdfd[i], len);
tstrncpy(envUdfdWithPEnv[outIdx], envUdfd[i], len);
outIdx++;
}
}
envUdfdWithPEnv[numEnviron + lenEnvUdfd - 1] = NULL;
envUdfdWithPEnv[outIdx] = NULL;

options.env = envUdfdWithPEnv;
} else {
if (uvEnvItems != NULL) {
uv_os_free_environ(uvEnvItems, uvEnvItemCount);
}
options.env = envUdfd;
Comment thread
stephenkgu marked this conversation as resolved.
Outdated
}

Expand Down Expand Up @@ -258,9 +344,18 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
}

_OVER:
if (pathTaosdLdLibHeap) {
taosMemoryFree(pathTaosdLdLibHeap);
}
if (udfdPathLdLib) {
taosMemoryFree(udfdPathLdLib);
}
if (taosFqdnEnvItem) {
taosMemoryFree(taosFqdnEnvItem);
}
if (ldLibPathEnvItem) {
taosMemoryFree(ldLibPathEnvItem);
}

if (envUdfdWithPEnv != NULL) {
int32_t i = 0;
Expand Down
19 changes: 11 additions & 8 deletions source/libs/function/src/udfd.c
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,13 @@ int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[],
int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) {
TAOS_UDF_CHECK_PTR_RCODE(plugin);
plugin->scriptType = TSDB_FUNC_SCRIPT_PYTHON;
// todo: windows support
#ifdef WINDOWS
snprintf(plugin->libPath, PATH_MAX, "%s", "taospyudf.dll");
Comment on lines +404 to +405
#elif defined(_TD_DARWIN_64)
snprintf(plugin->libPath, PATH_MAX, "%s", "libtaospyudf.dylib");
#else
snprintf(plugin->libPath, PATH_MAX, "%s", "libtaospyudf.so");
#endif
plugin->libLoaded = false;
const char *funcName[UDFD_MAX_PLUGIN_FUNCS] = {"pyOpen", "pyClose", "pyUdfInit",
"pyUdfDestroy", "pyUdfScalarProc", "pyUdfAggStart",
Expand Down Expand Up @@ -1058,20 +1063,16 @@ void udfdGetFuncBodyPath(const SUdf *udf, char *path) {
TAOS_UDF_CHECK_PTR_RVOID(udf, path);
if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) {
#ifdef WINDOWS
snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".dll", global.udfDataDir, udf->name, udf->version, udf->createdTime);
snprintf(path, PATH_MAX, "%s/%s_%d_%" PRIx64 ".dll", global.udfDataDir, udf->name, udf->version, udf->createdTime);
#else
Comment thread
stephenkgu marked this conversation as resolved.
snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", global.udfDataDir, udf->name, udf->version,
udf->createdTime);
#endif
} else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) {
#ifdef WINDOWS
snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime);
#else
snprintf(path, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime);
Comment thread
stephenkgu marked this conversation as resolved.
Outdated
#endif
} else {
#ifdef WINDOWS
snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime);
snprintf(path, PATH_MAX, "%s/%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime);
#else
Comment thread
stephenkgu marked this conversation as resolved.
snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime);
Comment thread
stephenkgu marked this conversation as resolved.
Outdated
#endif
Expand Down Expand Up @@ -1633,7 +1634,9 @@ static int32_t udfdUvInit() {
TAOS_CHECK_RETURN(uv_loop_init(global.loop));

if (tsStartUdfd) { // udfd is started by taosd, which shall exit when taosd exit
TAOS_CHECK_RETURN(uv_pipe_init(global.loop, &global.ctrlPipe, 1));
// ipc=0 — see tudf.c counterpart. Control pipe is used only for
// parent-death EOF; ipc=1 fails uv_pipe_open on Windows.
TAOS_CHECK_RETURN(uv_pipe_init(global.loop, &global.ctrlPipe, 0));
TAOS_CHECK_RETURN(uv_pipe_open(&global.ctrlPipe, 0));
TAOS_CHECK_RETURN(uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb));
}
Expand Down
Loading