2019/1/16:增長了容許5個附件發送、smtp等配置的字段。sql
在ETL數據整合過程當中,確定會涉及到email的通知,好比ETL的執行狀況彙報,執行耗時彙報,關鍵數據更新狀況等信息彙報等,這些信息都是須要及時給到相應的operation人員或者使用BI數據的人員。數據庫
可是,若是一開始沒有規劃好郵件推送的一些基本信息,有可能會致使後期郵件發送混亂和很差管理等問題,例如:每一個人都有本身的etl,每一個人都會去開發本身的郵件通知,那隨着時間推移,後期哪些郵件要取消、哪些通知人要屏蔽等都是個難事,可能須要打開全部的ETL job去檢查,去修改,耗時耗力,很是不利於管理。數據庫設計
在實現該方案的時候,我主要考慮瞭如下幾個方面:oop
一、每一個人須要調用發送郵件的時候,儘可能不要重複再作一次拖拉整套組件了,拖拉一次公共組件就行了,因此我選擇用joblet來實現這個。spa
二、郵件的一些基礎公共信息必須在一個地方維護,好比發送、接收郵件列表,發送記錄等信息,因此我設計了數據庫表來存放這些信息,這樣只要更新數據庫信息,就可使得全局都使用統一的信息。設計
三、信息的發送、狀態、生成的方式均可以靈活控制,因此我設計了一個表來存儲這些信息,並且經過存儲過程生成具體的email信息,這樣能夠追蹤發送記錄等信息。3d
四、由於talend joblet支持變量,因此我儘可能將發送郵件組件中的一些變量都設計到數據庫表中,這樣方便於維護和修改。rest
數據庫設計主要有2張表:mail_send_group、mail_send_list_recblog
mail_send_group:該表是用於記錄發送者和接收者之間的信息,維護在這裏可讓後去維護更簡單,修改數據庫則全局啓用。ip
IF (OBJECT_ID(N'[chk].[mail_send_group]', N'U') IS NOT NULL) BEGIN PRINT N'刪除表:[chk].[mail_send_group]'; DROP TABLE [chk].[mail_send_group]; END GO CREATE TABLE [chk].[mail_send_group] ( [group_id] NVARCHAR(50) NOT NULL,--主鍵 [mail_to] NVARCHAR(1000) NOT NULL,--接收者郵箱列表,多個郵箱用;分割 [mail_from] NVARCHAR(100) NOT NULL,--發送者郵箱 [sender_name] NVARCHAR(100) NOT NULL,--發送者暱稱 [mail_cc] NVARCHAR(1000) NULL,--抄送郵箱列表,多個郵箱用;分割 [mail_bcc] NVARCHAR(100) NULL,--密送郵箱列表,多個郵箱用;分割 [smtp_host] NVARCHAR(100) NOT NULL,--smtp host地址 [smtp_port] INT NOT NULL,--smtp host 端口號 [user_name] NVARCHAR(50) NOT NULL,--郵箱用戶名 [user_pwd] NVARCHAR(50) NOT NULL,--郵箱用戶密碼 [create_date] DATETIME NOT NULL,--建立日期 [status] SMALLINT NULL--狀態(0禁用,1啓用) ) GO SELECT * FROM [chk].[mail_send_group] |
mail_send_list_rec:該表是用於記錄email生成的記錄和發送記錄的,每條信息經過group_id和上表關聯,就能夠知道每條信息是由誰發給誰的,何時發送的。
IF (OBJECT_ID(N'[chk].[mail_send_list_rec]', N'U') IS NOT NULL) BEGIN PRINT N'刪除表:[chk].[mail_send_list_rec]'; DROP TABLE [chk].[mail_send_list_rec]; END GO CREATE TABLE [chk].[mail_send_list_rec] ( [mail_id] NVARCHAR(50) NOT NULL,--主鍵 [group_id] NVARCHAR(50) NOT NULL,--所屬的group id,用於確認發送接收等信息 [scope] NVARCHAR(100) NOT NULL,--業務,用於區分不一樣業務生成的郵件,至關於一個分類 [subject] NVARCHAR(100) NOT NULL,--主題 [message] NVARCHAR(4000) NOT NULL,--正文,支持HTML代碼,建議是用HTML [create_date] DATETIME NOT NULL,--建立日期 [send_date] DATETIME NULL,--發送日期 [send_status] SMALLINT NULL,--發送狀態(0建立完未發送,1已經發送) [atta01_path] NVARCHAR(200) NULL,--第1個發送附件路徑絕對路徑,包含文件名/data/mailatts/ids/checkret.csv [atta02_path] NVARCHAR(200) NULL,--第1個發送附件路徑絕對路徑,包含文件名 [atta03_path] NVARCHAR(200) NULL,--第1個發送附件路徑絕對路徑,包含文件名 [atta04_path] NVARCHAR(200) NULL,--第1個發送附件路徑絕對路徑,包含文件名 [atta05_path] NVARCHAR(200) NULL--第1個發送附件路徑絕對路徑,包含文件名 ) GO |
"SELECT [a].[mail_id] ,[a].[subject] ,[a].[message] ,[b].[mail_from] ,[b].[mail_to] ,[b].[sender_name] ,[b].[mail_cc] ,[b].[mail_bcc] ,[b].[status] ,[a].[atta01_path] ,[a].[atta02_path] ,[a].[atta03_path] ,[a].[atta04_path] ,[a].[atta05_path] ,[b].[smtp_host] ,[b].[smtp_port] ,[b].[user_name] ,[b].[user_pwd] FROM [chk].[mail_send_list_rec] AS a INNER JOIN [chk].[mail_send_group] AS b ON ([a].[group_id] = [b].[group_id]) WHERE [a].[mail_id] = '" + ((String)globalMap.get("curr_mail_id")) + "' AND ISNULL([b].[status], 0) = 1" |
生成email:主要功能就是按照你想要發送的內容生成一個message,並插入到數據庫表中便可。
IF (OBJECT_ID(N'[chk].[usp_insert_ids_mail_send_list_rec]', N'P') IS NOT NULL) BEGIN PRINT N'刪除存儲過程:[chk].[usp_insert_ids_mail_send_list_rec]'; DROP PROC [chk].[usp_insert_ids_mail_send_list_rec]; END GO CREATE PROC [chk].[usp_insert_ids_mail_send_list_rec] ( @curr_date NVARCHAR(20), @atta01_path NVARCHAR(200), @atta02_path NVARCHAR(200), @atta03_path NVARCHAR(200), @atta04_path NVARCHAR(200), @atta05_path NVARCHAR(200) ) AS --==================================================================================================================================== -- ProcedureName : chk.usp_insert_ids_mail_send_list_rec -- Author : john.xiong -- CreateDate : 2019-01-02 -- Description : 生成daily的detail mail content /*************************************Parameters參數說明******************************************************************************* -- @curr_date : 數據實行日期YYYYMMDD **************************************Modfied List修改記錄***************************************************************************** -- Modified Date Modified User Version Modified Reason ************************************************************************************************************************************** -- 2019-01-02 john.xiong V01.00.00 初始化版本 **************************************************************************************************************************************/ --==================================================================================================================================== BEGIN BEGIN TRY DECLARE @begin_time DATETIME ,@end_time DATETIME ,@cost_time INT; SET @begin_time = DATEADD(HOUR, 8, GETDATE()); INSERT INTO [chk].[tb_proc_cost_log] ( [proc_name] ,[Object_name] ,[execute_time] ,[action] ,[remark] ,[cost_time] ) SELECT N'chk.usp_insert_ids_mail_send_list_rec' AS [proc_name] ,N'chk.mail_send_list_rec' AS [Object_name] ,@begin_time AS [execute_time] ,N'start' AS [action] ,'' AS [remark] ,0 AS [cost_time] DECLARE @mail_id UNIQUEIDENTIFIER, @scope NVARCHAR(100), @group_id UNIQUEIDENTIFIER, @subject NVARCHAR(100), @create_date DATETIME, @message NVARCHAR(4000), @temp_message NVARCHAR(4000), @count INT, @count1 INT, @count2 INT, @error_count INT SET @mail_id = NEWID(); SET @scope = N'IDS'; SET @group_id = N'8D42D25D-59C7-4A5E-AE9C-4A5F24D910B0' SET @subject = N'IDS daily - job運行狀況'; SET @create_date = DATEADD(HOUR, 8, GETDATE()); SET @count1 = 0; SET @count2 = 0; SET @error_count = 0; SET @message = '<span style="color:#000; line-height:30px"><ol>'; SET @temp_message = ''; SET @count = 0; SELECT @count = COUNT(*) FROM [chk].[log_move_blob_rec] AS a WHERE LEFT([a].[rec_load_time], 8) = @curr_date AND ([a].[scope] IN ('ids_regular_data', 'ids_regular_rtm') OR [a].[blobFileName] LIKE '%LCH%') SET @message = @message + N'<li>從landing搬移blob文件總數:' + CONVERT(NVARCHAR(20), ISNULL(@count, 0)); SET @temp_message = ''; SET @count = 0; SELECT @count = COUNT(*) FROM [chk].[log_move_blob_rec] AS a WHERE LEFT([a].[rec_load_time], 8) = @curr_date AND [a].[scope] = 'ids_regular_data' SET @message = @message + N'<br>經銷商regular data文件數:' + CONVERT(NVARCHAR(20), ISNULL(@count, 0)); SET @temp_message = ''; SET @count = 0; SELECT @count = COUNT(*) FROM [chk].[log_move_blob_rec] AS a WHERE LEFT([a].[rec_load_time], 8) = @curr_date AND [a].[scope] IN ('ids_regular_rtm') SET @message = @message + N'<br>restatement data文件數:' + CONVERT(NVARCHAR(20), ISNULL(@count, 0)); SET @temp_message = ''; SET @count = 0; SELECT @count = COUNT(*) FROM [chk].[log_move_blob_rec] AS a WHERE LEFT([a].[rec_load_time], 8) = @curr_date AND [a].[blobFileName] LIKE '%LCH%' SET @message = @message + N'<br>local customer hierarchy daily文件數:' + CONVERT(NVARCHAR(20), ISNULL(@count, 0)) + '</li>'; SET @temp_message = ''; SET @count = 0; SELECT @count = SUM([a].[file_count]) FROM [chk].[log_blob_file_deal] AS a WHERE LOWER([a].[data_scope]) = 'ids' AND LOWER([a].[deal_level]) = 'ext' AND LOWER([a].[job_name]) = LOWER('IDS_Data_Blob_To_Stg_Ongoing_Loop_New_1_3') AND [a].[remark] LIKE '%tFileList Count%' AND CONVERT(NVARCHAR(8), [a].[deal_date], 112) = @curr_date SET @message = @message + N'<li>實際處理經銷商regular data文件數:' + CONVERT(NVARCHAR(20), ISNULL(@count, 0)) + '</li>'; SET @temp_message = ''; SET @count = 0; SELECT @count = SUM([a].[file_count]) FROM [chk].[log_blob_file_deal] AS a WHERE LOWER([a].[data_scope]) = 'ids' AND LOWER([a].[deal_level]) = 'ext' AND LOWER([a].[job_name]) = LOWER('IDS_Data_Blob_To_Stg_SalesDaily_Rtm_New_1_4') AND [a].[remark] LIKE '%tFileList Count%' AND CONVERT(NVARCHAR(8), [a].[deal_date], 112) = @curr_date SET @message = @message + N'<li>實際處理restatement data文件數:' + CONVERT(NVARCHAR(20), ISNULL(@count, 0)) + '</li>'; SET @temp_message = ''; SET @count = 0; SELECT @count = COUNT([a].[file_name]) FROM [chk].[log_file_deal_error_rec] AS a WHERE LOWER([a].[data_scope]) = 'ids' AND LOWER([a].[deal_level]) = 'ext' AND LOWER([a].[job_name]) = LOWER('IDS_Data_Blob_To_Stg_Ongoing_Loop_New_1_3') AND CONVERT(NVARCHAR(8), [a].[deal_date], 112) = @curr_date SET @message = @message + N'<li>沒法解壓的經銷商regular data文件數:' + CONVERT(NVARCHAR(20), ISNULL(@count, 0)) + '</li>'; SET @temp_message = ''; SET @count = 0; SELECT @count = COUNT([a].[file_name]) FROM [chk].[log_file_deal_error_rec] AS a WHERE LOWER([a].[data_scope]) = 'ids' AND LOWER([a].[deal_level]) = 'ext' AND LOWER([a].[job_name]) = LOWER('IDS_Data_Blob_To_Stg_SalesDaily_Rtm_New_1_4') AND CONVERT(NVARCHAR(8), [a].[deal_date], 112) = @curr_date SET @message = @message + N'<li>沒法解壓的restatement data文件數:' + CONVERT(NVARCHAR(20), ISNULL(@count, 0)) + '</li>'; SET @temp_message = ''; SET @count = 0; SELECT @count = SUM([a].[file_count]) FROM [chk].[log_blob_file_deal] AS a WHERE LOWER([a].[data_scope]) = 'ids' AND LOWER([a].[deal_level]) = 'ext' AND LOWER([a].[job_name]) = LOWER('IDS_RCS_Local_Master_Data_Daily_1_2') AND [a].[remark] LIKE '%tFileList Count lch%' AND CONVERT(NVARCHAR(8), [a].[deal_date], 112) = @curr_date SET @message = @message + N'<li>處理local customer hierarchy daily文件數:' + CONVERT(NVARCHAR(20), ISNULL(@count, 0)); SET @temp_message = ''; SET @count = 0; SET @count1 = 0; SELECT TOP (1) @count1 = [a].[row_count] FROM [chk].[log_table_data_rec] AS a WHERE [a].[data_scope] = 'rcs dim' AND [a].[table_name] = 'stg.cust_ids_rcs_local_customer_hierarchy_daily' AND CONVERT(NVARCHAR(8), [a].[action_time], 112) = @curr_date ORDER BY [a].[action_time] DESC SET @message = @message + N'<br>文件數據行數:' + CONVERT(NVARCHAR(20), ISNULL(@count1, 0)); SET @temp_message = ''; SET @count = 0; SET @count2 = 0; SELECT @count2 = COUNT(*) FROM [stg].[cust_ids_rcs_local_customer_hierarchy_daily] AS a WHERE LEFT([a].[rec_load_time], 8) = @curr_date SET @message = @message + N'<br>入庫數據行數:' + CONVERT(NVARCHAR(20), ISNULL(@count2, 0)) + '</li>'; IF (@count1 <> @count2) BEGIN SET @error_count = @error_count + 1; END IF (OBJECT_ID(N'[chk].[temp_mail_send_proc_error_list_ids_daily]', N'U') IS NOT NULL) BEGIN DROP TABLE [chk].[temp_mail_send_proc_error_list_ids_daily]; END /*生成錯誤proc的記錄*/ CREATE TABLE [chk].[temp_mail_send_proc_error_list_ids_daily] WITH ( DISTRIBUTION = ROUND_ROBIN, CLUSTERED COLUMNSTORE INDEX ) AS SELECT [a].[proc_name] ,ROW_NUMBER() OVER(ORDER BY [a].[error_time] ASC) AS [Num] FROM [chk].[log_proc_error_rec] AS a WHERE [a].[proc_name] LIKE '%ids%' AND [a].[proc_name] NOT LIKE '%mail%' AND CONVERT(NVARCHAR(8), [a].[error_time], 112) = @curr_date SET @count = 0; SELECT @count = COUNT(*) FROM [chk].[temp_mail_send_proc_error_list_ids_daily]; IF (@count > 0) BEGIN SET @message = @message + N'<li style="color:red">有錯誤的PROC:' + CONVERT(NVARCHAR(20), @count); SET @error_count = @error_count + @count; END WHILE (@count > 0) BEGIN SELECT @temp_message = [proc_name] FROM [chk].[temp_mail_send_proc_error_list_ids_daily] WHERE [Num] = @count; SET @message = @message + N'<br />' + @temp_message + '; '; SET @count = @count - 1; END SET @message = @message + '</li>'; IF (@error_count <> 0) BEGIN SET @subject = @subject + ':有 ' + CONVERT(NVARCHAR(20), @error_count) + ' 個錯誤'; END SET @subject = @curr_date + N' ' + @subject; SET @message = @message + '</ol></span>' PRINT @message INSERT INTO [chk].[mail_send_list_rec] ( [mail_id] ,[group_id] ,[scope] ,[subject] ,[message] ,[create_date] ,[send_date] ,[send_status] ,[atta01_path] ,[atta02_path] ,[atta03_path] ,[atta04_path] ,[atta05_path] ) SELECT @mail_id, @group_id, @scope, @subject, @message, @create_date, NULL, 0, @atta01_path, @atta02_path, @atta03_path, @atta04_path, @atta05_path SET @end_time = DATEADD(HOUR, 8, GETDATE()); SET @cost_time = DATEDIFF(SECOND, @begin_time, @end_time); INSERT INTO [chk].[tb_proc_cost_log] ( [proc_name] ,[Object_name] ,[execute_time] ,[action] ,[remark] ,[cost_time] ) SELECT N'chk.usp_insert_ids_mail_send_list_rec' AS [proc_name] ,N'chk.mail_send_list_rec' AS [Object_name] ,@end_time AS [execute_time] ,N'end' AS [action] ,CONVERT(NVARCHAR(50), @mail_id) AS [remark] ,@cost_time AS [cost_time] PRINT N'Exec success'; SELECT @mail_id AS [curr_mail_id] END TRY BEGIN CATCH INSERT INTO [chk].[log_proc_error_rec] ( [proc_name] ,[error_source] ,[error_time] ,[error_severity] ,[error_state] ,[error_msg] ,[log_user] ) SELECT N'chk.usp_insert_ids_mail_send_list_rec' AS [proc_name] ,ERROR_PROCEDURE() AS [error_source] ,DATEADD(HOUR, 8, GETDATE()) AS [error_time] ,ERROR_SEVERITY() AS [error_severity] ,ERROR_STATE() AS [error_state] ,ERROR_MESSAGE() AS [error_msg] ,SUSER_SNAME() AS [log_user] PRINT N'Exec failed'; END CATCH END |
更新email by mail_id
IF (OBJECT_ID(N'[chk].[usp_update_mail_send_list_rec_by_mail_id]', N'P') IS NOT NULL) BEGIN PRINT N'刪除存儲過程:[chk].[usp_update_mail_send_list_rec_by_mail_id]'; DROP PROC [chk].[usp_update_mail_send_list_rec_by_mail_id]; END GO CREATE PROC [chk].[usp_update_mail_send_list_rec_by_mail_id] ( @mail_id NVARCHAR(50) ,@send_date DATETIME ,@send_status SMALLINT ) AS --==================================================================================================================================== -- ProcedureName : [chk].[usp_update_mail_send_list_rec_by_mail_id] -- Author : john.xiong -- CreateDate : 2018-12-24 -- Description : 根據mail_id更新mail發生記錄信息 /*************************************Parameters參數說明******************************************************************************* -- @mail_id : 郵件id NEWID **************************************Modfied List修改記錄***************************************************************************** -- Modified Date Modified User Version Modified Reason ************************************************************************************************************************************** -- 2018-12-24 john.xiong V01.00.00 初始化版本 **************************************************************************************************************************************/ --==================================================================================================================================== BEGIN BEGIN TRY DECLARE @begin_time DATETIME ,@end_time DATETIME ,@cost_time INT SET @begin_time = DATEADD(HOUR, 8, GETDATE()); INSERT INTO [chk].[tb_proc_cost_log] ( [proc_name] ,[Object_name] ,[execute_time] ,[action] ,[remark] ,[cost_time] ) SELECT N'chk.usp_update_mail_send_list_rec_by_mail_id' AS [proc_name] ,N'chk.mail_send_list_rec' AS [Object_name] ,@begin_time AS [execute_time] ,N'start' AS [action] ,N'' AS [remark] ,0 AS [cost_time] IF (@mail_id IS NULL) BEGIN RAISERROR (N'mail id錯誤!強制退出', 16, 1); END IF (@send_date IS NULL) BEGIN SET @send_date = DATEADD(HOUR, 8, GETDATE()); END UPDATE [chk].[mail_send_list_rec] SET [send_date] = @send_date, [send_status] = @send_status WHERE [mail_id] = @mail_id; SET @end_time = DATEADD(HOUR, 8, GETDATE()); SET @cost_time = DATEDIFF(SECOND, @begin_time, @end_time); INSERT INTO [chk].[tb_proc_cost_log] ( [proc_name] ,[Object_name] ,[execute_time] ,[action] ,[remark] ,[cost_time] ) SELECT N'chk.usp_update_mail_send_list_rec_by_mail_id' AS [proc_name] ,N'chk.mail_send_list_rec' AS [Object_name] ,@end_time AS [execute_time] ,N'end' AS [action] ,N'' AS [remark] ,@cost_time AS [cost_time] PRINT N'exec successed' END TRY BEGIN CATCH INSERT INTO [chk].[log_proc_error_rec] ( [proc_name] ,[error_source] ,[error_time] ,[error_severity] ,[error_state] ,[error_msg] ,[log_user] ) SELECT N'chk.usp_update_mail_send_list_rec_by_mail_id' AS [proc_name] ,ERROR_PROCEDURE() AS [error_source] ,DATEADD(HOUR, 8, GETDATE()) AS [error_time] ,ERROR_SEVERITY() AS [error_severity] ,ERROR_STATE() AS [error_state] ,ERROR_MESSAGE() AS [error_msg] ,SUSER_SNAME() AS [log_user] PRINT N'exec failed' END CATCH END |
在須要發送email的job中,將joblet拖拉過去便可,而後生成一個你須要發送的郵件的mail_id,經過input組件將其傳遞到joblet組件的input輸入中,這樣就能夠將joblet融入到job中。
若是您以爲此文章對您有幫助,請點擊右下方【推薦】讓更多人看到,thanks!