Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions source/common/src/tglobal.c
Original file line number Diff line number Diff line change
Expand Up @@ -425,11 +425,7 @@ bool tsIfAdtFse = false; // ADT-FSE algorithom or origina
char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR

// udf
#if defined(WINDOWS) || !defined(USE_UDF)
bool tsStartUdfd = false;
#else
bool tsStartUdfd = true;
#endif

Comment thread
stephenkgu marked this conversation as resolved.
// wal
int64_t tsWalFsyncDataSizeLimit = (100 * 1024 * 1024L);
Expand Down
4 changes: 0 additions & 4 deletions source/dnode/mnode/impl/src/mndFunc.c
Original file line number Diff line number Diff line change
Expand Up @@ -427,10 +427,6 @@ static int32_t mndProcessCreateFuncReq(SRpcMsg *pReq) {

TAOS_CHECK_GOTO(tDeserializeSCreateFuncReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);

#ifdef WINDOWS
code = TSDB_CODE_MND_INVALID_PLATFORM;
goto _OVER;
#endif
mInfo("func:%s, start to create, size:%d", createReq.name, createReq.codeLen);
TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CREATE_FUNC), NULL, _OVER);

Expand Down
2 changes: 1 addition & 1 deletion source/libs/executor/src/projectoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -1359,7 +1359,7 @@ int32_t projectApplyFunctionsWithSelect(SExprInfo* pExpr, SSDataBlock* pResult,
int32_t code = TSDB_CODE_SUCCESS;

SExecTaskInfo* savedTaskInfo = gTaskScalarExtra.pTaskInfo;
__typeof__(gTaskScalarExtra.isTaskKilled) savedIsTaskKilled = gTaskScalarExtra.isTaskKilled;
sclIsTaskKilled savedIsTaskKilled = gTaskScalarExtra.isTaskKilled;

if (pTaskInfo != NULL) {
gTaskScalarExtra.pTaskInfo = pTaskInfo;
Expand Down
83 changes: 56 additions & 27 deletions source/libs/function/src/tudf.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,17 @@ SUdfdData udfdGlobal = {0};
int32_t udfStartUdfd(int32_t startDnodeId);
void udfStopUdfd();

extern char **environ;
/* Note: udfd spawning now uses libuv's portable ``uv_os_environ`` instead
* of the POSIX ``extern char **environ`` symbol so the same code works on
* Windows. */

#ifdef WINDOWS
#define UDF_LIB_PATH_ENV "PATH"
#define UDF_LIB_PATH_SEP ';'
#else
#define UDF_LIB_PATH_ENV "LD_LIBRARY_PATH"
#define UDF_LIB_PATH_SEP ':'
#endif

static int32_t udfSpawnUdfd(SUdfdData *pData);
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal);
Expand Down Expand Up @@ -121,10 +131,17 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {

options.exit_cb = udfUdfdExit;

TAOS_UV_LIB_ERROR_RET(uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1));
// ipc=0: this is a one-way control pipe used only to detect parent
// death via read EOF. No handle-passing is needed. ipc=1 forces a
// libuv pid-handshake on Windows that fails for child-stdio pipes.
TAOS_UV_LIB_ERROR_RET(uv_pipe_init(&pData->loop, &pData->ctrlPipe, 0));

uv_stdio_container_t child_stdio[3];
child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
// UV_OVERLAPPED_PIPE is required on Windows: without it libuv uses
// CreatePipe() which yields a non-overlapped anonymous pipe, and the
// child's uv_pipe_open(fd 0) fails because libuv's IOCP machinery
// requires FILE_FLAG_OVERLAPPED. The flag is a no-op on POSIX.
child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_OVERLAPPED_PIPE;
Comment thread
stephenkgu marked this conversation as resolved.
child_stdio[0].data.stream = (uv_stream_t *)&pData->ctrlPipe;
child_stdio[1].flags = UV_IGNORE;
child_stdio[2].flags = UV_INHERIT_FD;
Expand All @@ -148,7 +165,7 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {

char pathTaosdLdLib[512] = {0};
size_t taosdLdLibPathLen = sizeof(pathTaosdLdLib);
int32_t ret = uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen);
int32_t ret = uv_os_getenv(UDF_LIB_PATH_ENV, pathTaosdLdLib, &taosdLdLibPathLen);
if (ret != UV_ENOBUFS) {
taosdLdLibPathLen = strlen(pathTaosdLdLib);
}
Expand All @@ -157,15 +174,15 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
size_t udfdLdLibPathLen = strlen(tsUdfdLdLibPath);
tstrncpy(udfdPathLdLib, tsUdfdLdLibPath, sizeof(udfdPathLdLib));

udfdPathLdLib[udfdLdLibPathLen] = ':';
udfdPathLdLib[udfdLdLibPathLen] = UDF_LIB_PATH_SEP;
tstrncpy(udfdPathLdLib + udfdLdLibPathLen + 1, pathTaosdLdLib, sizeof(udfdPathLdLib) - udfdLdLibPathLen - 1);
Comment thread
stephenkgu marked this conversation as resolved.
Outdated
if (udfdLdLibPathLen + taosdLdLibPathLen < 1024) {
fnInfo("udfd LD_LIBRARY_PATH: %s", udfdPathLdLib);
fnInfo("udfd %s: %s", UDF_LIB_PATH_ENV, udfdPathLdLib);
} else {
fnError("can not set correct udfd LD_LIBRARY_PATH");
fnError("can not set correct udfd %s", UDF_LIB_PATH_ENV);
}
char ldLibPathEnvItem[1024 + 32] = {0};
snprintf(ldLibPathEnvItem, 1024 + 32, "%s=%s", "LD_LIBRARY_PATH", udfdPathLdLib);
snprintf(ldLibPathEnvItem, 1024 + 32, "%s=%s", UDF_LIB_PATH_ENV, udfdPathLdLib);

char *taosFqdnEnvItem = NULL;
char *taosFqdn = getenv("TAOS_FQDN");
Expand All @@ -185,47 +202,59 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {

char *envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, ldLibPathEnvItem, taosFqdnEnvItem, NULL};

char **envUdfdWithPEnv = NULL;
if (environ != NULL) {
// Inherit the parent process's environment so spawned udfd sees the same
// PATH / locale / config-related variables. Using libuv's portable
// ``uv_os_environ`` instead of POSIX ``extern char **environ`` lets this
// path work on Windows as well as Linux/macOS.
char **envUdfdWithPEnv = NULL;
uv_env_item_t *uvEnvItems = NULL;
int uvEnvItemCount = 0;
int32_t uvEnvErr = uv_os_environ(&uvEnvItems, &uvEnvItemCount);
if (uvEnvErr == 0 && uvEnvItems != NULL && uvEnvItemCount > 0) {
int32_t lenEnvUdfd = ARRAY_SIZE(envUdfd);
int32_t numEnviron = 0;
while (environ[numEnviron] != NULL) {
numEnviron++;
}

envUdfdWithPEnv = (char **)taosMemoryCalloc(numEnviron + lenEnvUdfd, sizeof(char *));
envUdfdWithPEnv = (char **)taosMemoryCalloc(uvEnvItemCount + lenEnvUdfd, sizeof(char *));
Comment thread
stephenkgu marked this conversation as resolved.
if (envUdfdWithPEnv == NULL) {
err = TSDB_CODE_OUT_OF_MEMORY;
uv_os_free_environ(uvEnvItems, uvEnvItemCount);
goto _OVER;
}

for (int32_t i = 0; i < numEnviron; i++) {
int32_t len = strlen(environ[i]) + 1;
envUdfdWithPEnv[i] = (char *)taosMemoryCalloc(len, 1);
if (envUdfdWithPEnv[i] == NULL) {
int32_t outIdx = 0;
for (int i = 0; i < uvEnvItemCount; i++) {
int32_t nameLen = strlen(uvEnvItems[i].name);
int32_t valueLen = strlen(uvEnvItems[i].value);
Comment thread
stephenkgu marked this conversation as resolved.
Outdated
int32_t len = nameLen + 1 /* '=' */ + valueLen + 1 /* '\0' */;
envUdfdWithPEnv[outIdx] = (char *)taosMemoryCalloc(len, 1);
if (envUdfdWithPEnv[outIdx] == NULL) {
err = TSDB_CODE_OUT_OF_MEMORY;
uv_os_free_environ(uvEnvItems, uvEnvItemCount);
goto _OVER;
}

tstrncpy(envUdfdWithPEnv[i], environ[i], len);
snprintf(envUdfdWithPEnv[outIdx], len, "%s=%s", uvEnvItems[i].name, uvEnvItems[i].value);
outIdx++;
}
uv_os_free_environ(uvEnvItems, uvEnvItemCount);
uvEnvItems = NULL;

for (int32_t i = 0; i < lenEnvUdfd; i++) {
if (envUdfd[i] != NULL) {
int32_t len = strlen(envUdfd[i]) + 1;
Comment thread
stephenkgu marked this conversation as resolved.
Outdated
envUdfdWithPEnv[numEnviron + i] = (char *)taosMemoryCalloc(len, 1);
if (envUdfdWithPEnv[numEnviron + i] == NULL) {
envUdfdWithPEnv[outIdx] = (char *)taosMemoryCalloc(len, 1);
if (envUdfdWithPEnv[outIdx] == NULL) {
Comment thread
stephenkgu marked this conversation as resolved.
err = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}

tstrncpy(envUdfdWithPEnv[numEnviron + i], envUdfd[i], len);
tstrncpy(envUdfdWithPEnv[outIdx], envUdfd[i], len);
outIdx++;
}
}
envUdfdWithPEnv[numEnviron + lenEnvUdfd - 1] = NULL;
envUdfdWithPEnv[outIdx] = NULL;

options.env = envUdfdWithPEnv;
} else {
if (uvEnvItems != NULL) {
uv_os_free_environ(uvEnvItems, uvEnvItemCount);
}
options.env = envUdfd;
Comment thread
stephenkgu marked this conversation as resolved.
Outdated
}

Expand Down
26 changes: 15 additions & 11 deletions source/libs/function/src/udfd.c
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,13 @@ int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[],
int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) {
TAOS_UDF_CHECK_PTR_RCODE(plugin);
plugin->scriptType = TSDB_FUNC_SCRIPT_PYTHON;
// todo: windows support
#ifdef WINDOWS
snprintf(plugin->libPath, PATH_MAX, "%s", "taospyudf.dll");
Comment on lines +404 to +405
#elif defined(_TD_DARWIN_64)
snprintf(plugin->libPath, PATH_MAX, "%s", "libtaospyudf.dylib");
#else
snprintf(plugin->libPath, PATH_MAX, "%s", "libtaospyudf.so");
#endif
plugin->libLoaded = false;
const char *funcName[UDFD_MAX_PLUGIN_FUNCS] = {"pyOpen", "pyClose", "pyUdfInit",
"pyUdfDestroy", "pyUdfScalarProc", "pyUdfAggStart",
Expand Down Expand Up @@ -1058,23 +1063,15 @@ void udfdGetFuncBodyPath(const SUdf *udf, char *path) {
TAOS_UDF_CHECK_PTR_RVOID(udf, path);
if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) {
#ifdef WINDOWS
snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".dll", global.udfDataDir, udf->name, udf->version, udf->createdTime);
snprintf(path, PATH_MAX, "%s/%s_%d_%" PRIx64 ".dll", global.udfDataDir, udf->name, udf->version, udf->createdTime);
#else
Comment thread
stephenkgu marked this conversation as resolved.
snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", global.udfDataDir, udf->name, udf->version,
udf->createdTime);
#endif
} else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) {
#ifdef WINDOWS
snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime);
#else
snprintf(path, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime);
Comment thread
stephenkgu marked this conversation as resolved.
Outdated
#endif
} else {
#ifdef WINDOWS
snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime);
#else
snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime);
Comment thread
stephenkgu marked this conversation as resolved.
Outdated
#endif
}
}

Expand Down Expand Up @@ -1633,7 +1630,9 @@ static int32_t udfdUvInit() {
TAOS_CHECK_RETURN(uv_loop_init(global.loop));

if (tsStartUdfd) { // udfd is started by taosd, which shall exit when taosd exit
TAOS_CHECK_RETURN(uv_pipe_init(global.loop, &global.ctrlPipe, 1));
// ipc=0 — see tudf.c counterpart. Control pipe is used only for
// parent-death EOF; ipc=1 fails uv_pipe_open on Windows.
TAOS_CHECK_RETURN(uv_pipe_init(global.loop, &global.ctrlPipe, 0));
TAOS_CHECK_RETURN(uv_pipe_open(&global.ctrlPipe, 0));
TAOS_CHECK_RETURN(uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb));
}
Expand Down Expand Up @@ -1765,7 +1764,12 @@ void udfdDeinitResidentFuncs() {
}

int32_t udfdCreateUdfSourceDir() {
#ifdef WINDOWS
snprintf(global.udfDataDir, PATH_MAX, "%s.udf", tsDataDir);
Comment thread
stephenkgu marked this conversation as resolved.
Outdated
#else
snprintf(global.udfDataDir, PATH_MAX, "%s/.udf", tsDataDir);
#endif
//snprintf(global.udfDataDir, PATH_MAX, "%s/.udf", tsDataDir);
Comment thread
stephenkgu marked this conversation as resolved.
Outdated
int32_t code = taosMkDir(global.udfDataDir);
if (code != TSDB_CODE_SUCCESS) {
snprintf(global.udfDataDir, PATH_MAX, "%s/.udf", tsTempDir);
Comment thread
stephenkgu marked this conversation as resolved.
Outdated
Expand Down
Loading