Skip to content

Commit

Permalink
feat(stream)[TS-5469]. add support for window event notifications in …
Browse files Browse the repository at this point in the history
…stream processing

- Introduce new syntax to specify notification type and destination address
- Collect relevant event information during window computations
- Implement websocket-based notification delivery to the specified address
  • Loading branch information
JinqingKuang committed Jan 21, 2025
1 parent f3b38ed commit 7a5ee63
Show file tree
Hide file tree
Showing 43 changed files with 1,701 additions and 94 deletions.
2 changes: 1 addition & 1 deletion cmake/addr2line_CMakeLists.txt.in
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# addr2line
ExternalProject_Add(addr2line
GIT_REPOSITORY https://github.com/davea42/libdwarf-addr2line.git
GIT_TAG master
GIT_TAG main
SOURCE_DIR "${TD_CONTRIB_DIR}/addr2line"
BINARY_DIR "${TD_CONTRIB_DIR}/addr2line"
CONFIGURE_COMMAND ""
Expand Down
2 changes: 1 addition & 1 deletion cmake/curl_CMakeLists.txt.in
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ ExternalProject_Add(curl2
BUILD_IN_SOURCE TRUE
BUILD_ALWAYS 1
UPDATE_COMMAND ""
CONFIGURE_COMMAND ./configure --prefix=$ENV{HOME}/.cos-local.2 --with-ssl=$ENV{HOME}/.cos-local.2 --enable-shared=no --disable-ldap --disable-ldaps --without-brotli --without-zstd --without-libidn2 --without-nghttp2 --without-libpsl #--enable-debug
CONFIGURE_COMMAND ${CONTRIB_CONFIG_ENV} ./configure --prefix=$ENV{HOME}/.cos-local.2 --with-ssl=$ENV{HOME}/.cos-local.2 --enable-websockets --enable-shared=no --disable-ldap --disable-ldaps --without-brotli --without-zstd --without-libidn2 --without-nghttp2 --without-libpsl #--enable-debug
BUILD_COMMAND make -j
INSTALL_COMMAND make install
TEST_COMMAND ""
Expand Down
6 changes: 3 additions & 3 deletions cmake/ssl_CMakeLists.txt.in
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ ExternalProject_Add(openssl
DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download"
SOURCE_DIR "${TD_CONTRIB_DIR}/openssl"
BUILD_IN_SOURCE TRUE
#BUILD_ALWAYS 1
#UPDATE_COMMAND ""
CONFIGURE_COMMAND ./Configure --prefix=$ENV{HOME}/.cos-local.2 no-shared
BUILD_ALWAYS 1
UPDATE_COMMAND ""
CONFIGURE_COMMAND ${CONTRIB_CONFIG_ENV} ./Configure --prefix=$ENV{HOME}/.cos-local.2 no-shared
BUILD_COMMAND make -j
INSTALL_COMMAND make install_sw -j
TEST_COMMAND ""
Expand Down
16 changes: 12 additions & 4 deletions contrib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ elseif(${BUILD_WITH_COS})
file(MAKE_DIRECTORY $ENV{HOME}/.cos-local.1/)
cat("${TD_SUPPORT_DIR}/mxml_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3})
cat("${TD_SUPPORT_DIR}/apr_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3})
cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3})
endif(${BUILD_WITH_COS})

configure_file(${CONTRIB_TMP_FILE3} "${TD_CONTRIB_DIR}/deps-download/CMakeLists.txt")
Expand Down Expand Up @@ -146,11 +145,16 @@ if(${BUILD_WITH_SQLITE})
cat("${TD_SUPPORT_DIR}/sqlite_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_SQLITE})

# libcurl
if(NOT ${TD_WINDOWS})
file(MAKE_DIRECTORY $ENV{HOME}/.cos-local.2/)
cat("${TD_SUPPORT_DIR}/ssl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(NOT ${TD_WINDOWS})

# s3
if(${BUILD_WITH_S3})
cat("${TD_SUPPORT_DIR}/ssl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
cat("${TD_SUPPORT_DIR}/xml2_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
cat("${TD_SUPPORT_DIR}/libs3_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
cat("${TD_SUPPORT_DIR}/azure_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
add_definitions(-DUSE_S3)
Expand All @@ -160,7 +164,6 @@ elseif(${BUILD_WITH_COS})
# cat("${TD_SUPPORT_DIR}/mxml_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
# cat("${TD_SUPPORT_DIR}/apr_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
# cat("${TD_SUPPORT_DIR}/apr-util_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
# cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
cat("${TD_SUPPORT_DIR}/cos_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
add_definitions(-DUSE_COS)
endif()
Expand Down Expand Up @@ -199,6 +202,11 @@ endif()
# lemon
cat("${TD_SUPPORT_DIR}/lemon_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})

# Force specify CC=cc on MacOS. Because the default CC setting in the generated Makefile has issues finding standard library headers
IF(${TD_DARWIN})
SET(CONTRIB_CONFIG_ENV "CC=cc")
ENDIF()

# download dependencies
configure_file(${CONTRIB_TMP_FILE} "${TD_CONTRIB_DIR}/deps-download/CMakeLists.txt")
execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" .
Expand Down
5 changes: 4 additions & 1 deletion include/common/tcommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ typedef enum EStreamType {
STREAM_PARTITION_DELETE_DATA,
STREAM_GET_RESULT,
STREAM_DROP_CHILD_TABLE,
STREAM_EVENT_OPEN_WINDOW,
STREAM_NOTIFY_EVENT,
} EStreamType;

#pragma pack(push, 1)
Expand Down Expand Up @@ -409,6 +409,9 @@ typedef struct STUidTagInfo {
#define UD_GROUPID_COLUMN_INDEX 1
#define UD_TAG_COLUMN_INDEX 2

// stream notify event block column
#define NOTIFY_EVENT_STR_COLUMN_INDEX 0

int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime);
int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol);

Expand Down
2 changes: 2 additions & 0 deletions include/common/tdatablock.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ bool isAutoTableName(char* ctbName);
int32_t buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId, size_t cap);
int32_t buildCtbNameByGroupId(const char* stbName, uint64_t groupId, char** pName);
int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf);
int32_t buildSinkDestTableName(char* parTbName, const char* stbFullName, uint64_t gid, bool newSubTableRule,
char** dstTableName);

int32_t trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList);

Expand Down
6 changes: 6 additions & 0 deletions include/common/tmsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ typedef enum ENodeType {
QUERY_NODE_TSMA_OPTIONS,
QUERY_NODE_ANOMALY_WINDOW,
QUERY_NODE_RANGE_AROUND,
QUERY_NODE_STREAM_NOTIFY_OPTIONS,

// Statement nodes are used in parser and planner module.
QUERY_NODE_SET_OPERATOR = 100,
Expand Down Expand Up @@ -2956,6 +2957,11 @@ typedef struct {
// 3.3.0.0
SArray* pCols; // array of SField
int64_t smaId;
// 3.3.6.0
SArray* pNotifyAddrUrls;
int32_t notifyEventTypes;
int32_t notifyErrorHandle;
int8_t notifyHistory;
} SCMCreateStreamReq;

typedef struct {
Expand Down
3 changes: 3 additions & 0 deletions include/libs/executor/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ int32_t qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);

int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);

int32_t qSetStreamNotifyInfo(qTaskInfo_t tinfo, int32_t eventTypes, const SSchemaWrapper* pSchemaWrapper,
const char* stbFullName, bool newSubTableRule);

/**
* Set multiple input data blocks for the stream scan.
* @param tinfo
Expand Down
49 changes: 37 additions & 12 deletions include/libs/nodes/cmdnodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -566,19 +566,44 @@ typedef struct SStreamOptions {
int64_t setFlag;
} SStreamOptions;

typedef enum EStreamNotifyOptionSetFlag {
SNOTIFY_OPT_ERROR_HANDLE_SET = BIT_FLAG_MASK(0),
SNOTIFY_OPT_NOTIFY_HISTORY_SET = BIT_FLAG_MASK(1),
} EStreamNotifyOptionSetFlag;

typedef enum EStreamNotifyEventType {
SNOTIFY_EVENT_WINDOW_OPEN = BIT_FLAG_MASK(0),
SNOTIFY_EVENT_WINDOW_CLOSE = BIT_FLAG_MASK(1),
} EStreamNotifyEventType;

typedef enum EStreamNotifyErrorHandleType {
SNOTIFY_ERROR_HANDLE_PAUSE,
SNOTIFY_ERROR_HANDLE_DROP,
} EStreamNotifyErrorHandleType;

typedef struct SStreamNotifyOptions {
ENodeType type;
SNodeList* pAddrUrls;
EStreamNotifyEventType eventTypes;
EStreamNotifyErrorHandleType errorHandle;
bool notifyHistory;
EStreamNotifyOptionSetFlag setFlag;
} SStreamNotifyOptions;

typedef struct SCreateStreamStmt {
ENodeType type;
char streamName[TSDB_TABLE_NAME_LEN];
char targetDbName[TSDB_DB_NAME_LEN];
char targetTabName[TSDB_TABLE_NAME_LEN];
bool ignoreExists;
SStreamOptions* pOptions;
SNode* pQuery;
SNode* pPrevQuery;
SNodeList* pTags;
SNode* pSubtable;
SNodeList* pCols;
SCMCreateStreamReq* pReq;
ENodeType type;
char streamName[TSDB_TABLE_NAME_LEN];
char targetDbName[TSDB_DB_NAME_LEN];
char targetTabName[TSDB_TABLE_NAME_LEN];
bool ignoreExists;
SStreamOptions* pOptions;
SNode* pQuery;
SNode* pPrevQuery;
SNodeList* pTags;
SNode* pSubtable;
SNodeList* pCols;
SStreamNotifyOptions* pNotifyOptions;
SCMCreateStreamReq* pReq;
} SCreateStreamStmt;

typedef struct SDropStreamStmt {
Expand Down
22 changes: 18 additions & 4 deletions include/libs/stream/tstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,14 @@ typedef struct SStreamTaskSM SStreamTaskSM;
typedef struct SStreamQueueItem SStreamQueueItem;
typedef struct SActiveCheckpointInfo SActiveCheckpointInfo;

#define SSTREAM_TASK_VER 4
#define SSTREAM_TASK_INCOMPATIBLE_VER 1
#define SSTREAM_TASK_NEED_CONVERT_VER 2
#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3
#define SSTREAM_TASK_VER 5
#define SSTREAM_TASK_INCOMPATIBLE_VER 1
#define SSTREAM_TASK_NEED_CONVERT_VER 2
#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3 // Append subtable name with groupId
#define SSTREAM_TASK_APPEND_STABLE_NAME_VER 4 // Append subtable name with stableName and groupId
#define SSTREAM_TASK_ADD_NOTIFY_VER 5 // Support event notification at window open/close

#define IS_NEW_SUBTB_RULE(_t) (((_t)->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) && ((_t)->subtableWithoutMd5 != 1))

extern int32_t streamMetaRefPool;
extern int32_t streamTaskRefPool;
Expand Down Expand Up @@ -427,6 +431,15 @@ typedef struct STaskCheckInfo {
TdThreadMutex checkInfoLock;
} STaskCheckInfo;

typedef struct SNotifyInfo {
SArray* pNotifyAddrUrls;
int32_t notifyEventTypes;
int32_t notifyErrorHandle;
char* streamName;
char* stbFullName;
SSchemaWrapper* pSchemaWrapper;
} SNotifyInfo;

struct SStreamTask {
int64_t ver;
SStreamTaskId id;
Expand All @@ -449,6 +462,7 @@ struct SStreamTask {
SStreamState* pState; // state backend
SUpstreamInfo upstreamInfo;
STaskCheckInfo taskCheckInfo;
SNotifyInfo notifyInfo;

// the followings attributes don't be serialized
SScanhistorySchedInfo schedHistoryInfo;
Expand Down
1 change: 1 addition & 0 deletions include/util/tdef.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ typedef enum ELogicConditionType {
#define TSDB_OFFSET_LEN 64 // it is a null-terminated string
#define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN) // it is a null-terminated string
#define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string
#define TSDB_STREAM_NOTIFY_URL_LEN 128 // it includes the terminating '\0'
#define TSDB_DB_NAME_LEN 65
#define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_PRIVILEDGE_CONDITION_LEN 48 * 1024
Expand Down
3 changes: 3 additions & 0 deletions include/util/tlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ void taosResetLog();
void taosDumpData(uint8_t *msg, int32_t len);
void taosSetNoNewFile();

// Fast uint64_t to string conversion, equivalent to sprintf(buf, "%lu", val) but with 10x better performance.
char *u64toaFastLut(uint64_t val, char *buf);

void taosPrintLog(const char *flags, int32_t level, int32_t dflag, const char *format, ...)
#ifdef __GNUC__
__attribute__((format(printf, 4, 5)))
Expand Down
21 changes: 17 additions & 4 deletions source/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,23 @@ target_link_libraries(
INTERFACE api
)

if(NOT ${TD_WINDOWS})
target_include_directories(
common
PUBLIC "$ENV{HOME}/.cos-local.2/include"
)

find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
target_link_libraries(
common
PUBLIC ${CURL_LIBRARY}
PUBLIC ${SSL_LIBRARY}
PUBLIC ${CRYPTO_LIBRARY}
)
endif()

if(${BUILD_S3})
if(${BUILD_WITH_S3})
target_include_directories(
Expand All @@ -65,9 +82,6 @@ if(${BUILD_S3})
set(CMAKE_FIND_LIBRARY_SUFFIXES ".a")
set(CMAKE_PREFIX_PATH $ENV{HOME}/.cos-local.2)
find_library(S3_LIBRARY s3)
find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
target_link_libraries(
common

Expand All @@ -87,7 +101,6 @@ if(${BUILD_S3})
find_library(APR_LIBRARY apr-1 PATHS /usr/local/apr/lib/)
find_library(APR_UTIL_LIBRARY aprutil-1 PATHS /usr/local/apr/lib/)
find_library(MINIXML_LIBRARY mxml)
find_library(CURL_LIBRARY curl)
target_link_libraries(
common

Expand Down
35 changes: 35 additions & 0 deletions source/common/src/msg/tmsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -9959,6 +9959,16 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
}

TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->smaId));

int32_t addrSize = taosArrayGetSize(pReq->pNotifyAddrUrls);
TAOS_CHECK_EXIT(tEncodeI32(&encoder, addrSize));
for (int32_t i = 0; i < addrSize; ++i) {
const char *url = taosArrayGetP(pReq->pNotifyAddrUrls, i);
TAOS_CHECK_EXIT((tEncodeCStr(&encoder, url)));
}
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->notifyEventTypes));
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->notifyErrorHandle));
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->notifyHistory));
tEndEncode(&encoder);

_exit:
Expand Down Expand Up @@ -10093,6 +10103,30 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->smaId));
}

if (!tDecodeIsEnd(&decoder)) {
int32_t addrSize = 0;
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &addrSize));
pReq->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES);
if (pReq->pNotifyAddrUrls == NULL) {
TAOS_CHECK_EXIT(terrno);
}
for (int32_t i = 0; i < addrSize; ++i) {
char *url = NULL;
TAOS_CHECK_EXIT(tDecodeCStr(&decoder, &url));
url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN);
if (url == NULL) {
TAOS_CHECK_EXIT(terrno);
}
if (taosArrayPush(pReq->pNotifyAddrUrls, &url) == NULL) {
taosMemoryFree(url);
TAOS_CHECK_EXIT(terrno);
}
}
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->notifyEventTypes));
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->notifyErrorHandle));
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->notifyHistory));
}

tEndDecode(&decoder);
_exit:
tDecoderClear(&decoder);
Expand Down Expand Up @@ -10155,6 +10189,7 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
taosArrayDestroy(pReq->fillNullCols);
taosArrayDestroy(pReq->pVgroupVerList);
taosArrayDestroy(pReq->pCols);
taosArrayDestroyP(pReq->pNotifyAddrUrls, NULL);
}

int32_t tEncodeSRSmaParam(SEncoder *pCoder, const SRSmaParam *pRSmaParam) {
Expand Down
27 changes: 27 additions & 0 deletions source/common/src/tdatablock.c
Original file line number Diff line number Diff line change
Expand Up @@ -3061,6 +3061,33 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha
return code;
}

int32_t buildSinkDestTableName(char* parTbName, const char* stbFullName, uint64_t gid, bool newSubTableRule,
char** dstTableName) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;

if (parTbName[0]) {
if (newSubTableRule && !isAutoTableName(parTbName) && !alreadyAddGroupId(parTbName, gid) && gid != 0 &&
stbFullName) {
*dstTableName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
TSDB_CHECK_NULL(*dstTableName, code, lino, _end, terrno);

tstrncpy(*dstTableName, parTbName, TSDB_TABLE_NAME_LEN);
code = buildCtbNameAddGroupId(stbFullName, *dstTableName, gid, TSDB_TABLE_NAME_LEN);
TSDB_CHECK_CODE(code, lino, _end);
} else {
*dstTableName = taosStrdup(parTbName);
TSDB_CHECK_NULL(*dstTableName, code, lino, _end, terrno);
}
} else {
code = buildCtbNameByGroupId(stbFullName, gid, dstTableName);
TSDB_CHECK_CODE(code, lino, _end);
}

_end:
return code;
}

// return length of encoded data, return -1 if failed
int32_t blockEncode(const SSDataBlock* pBlock, char* data, size_t dataBuflen, int32_t numOfCols) {
int32_t code = blockDataCheck(pBlock);
Expand Down
Loading

0 comments on commit 7a5ee63

Please sign in to comment.