前言
好久沒有做項目了,這次做一個發送郵件的小項目。發郵件是一個比較耗時的操作,之前在我的個人博客里面回復評論和友鏈申請是會通過發送郵件來通知對方的,不過當時只是簡單的進行了異步操作。那么這次來使用RabbitMQ去統一發送郵件,我的想法是通過調用郵件發送接口,將請求發送到隊列。然后在隊列中接收并執行郵件發送操作。本文采用簡單的點對點模式:
在點對點模式中,只會有一個消費者進行消費。
架構圖
簡單描述下項目結構。項目主要分為生產者、RabbitMQ、消費者這3個對象。
-
生產者(Publisher):負責將郵件發送請求發送到RabbitMQ的隊列中。
-
RabbitMQ服務器:作為消息中間件,用于接收并存儲生產者發送的消息。
-
消費者(Consumer):從RabbitMQ的隊列中接收郵件發送請求,并執行實際的郵件發送操作。
項目結構
-
RabbitMQEmailProject
-
EamilApiProject 生產者
-
Controllers 控制器
-
Service 服務
-
RabiitMQClient 消費者
-
Program 主程序
-
Model 實體類
開始編碼(一階段)
首先我們先簡單的將生產者和消費者代碼完成,讓生產者能夠發送消息,消費者能夠接受并處理消息。代碼有點多,不過注釋也多很容易看懂。給生產者和消費者都安裝上用于處理RabiitMQ連接的Nuget包:
dotnet add package RabbitMQ.Client
生產者
EamilApiProject
配置文件
appsetting.json
"RabbitMQ": { "Hostname": "localhost", "Port": "5672", "Username": "guest", "Password": "guest"
}
控制器
[ApiController]
[Route("[controller]")]
public class SendEmailController : ControllerBase
{ private readonly EmailService _emailService; public SendEmailController(EmailService emailService) { _emailService = emailService; } [HttpPost(Name = "SendEmail")] public IActionResult Post([FromBody] EmailDto emailRequest) { _emailService.SendEamil(emailRequest); return Ok("郵件已發送"); }
}
服務
RabbitMQ連接服務
public class RabbitMqConnectionFactory :IDisposable
{ private readonly RabbitMqSettings _settings; private IConnection _connection; public RabbitMqConnectionFactory (IOptions<RabbitMqSettings> settings) { _settings = settings.Value; } public IModel CreateChannel() { if (_connection == null || _connection.IsOpen == false) { var factory = new ConnectionFactory() { HostName = _settings.Hostname, UserName = _settings.Username, Password = _settings.Password }; _connection = factory.CreateConnection(); } return _connection.CreateModel(); } public void Dispose() { if (_connection != null) { if (_connection.IsOpen) { _connection.Close(); } _connection.Dispose(); } }
}
發送郵件服務
public class EmailService
{private readonly RabbitMqConnectionFactory _connectionFactory;public EmailService(RabbitMqConnectionFactory connectionFactory){_connectionFactory = connectionFactory;}public void SendEamil(EmailDto emailDto){using var channel = _connectionFactory.CreateChannel();var properties = channel.CreateBasicProperties();properties.Persistent = true;//消息持久化var message = JsonConvert.SerializeObject(emailDto);var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish( string.Empty, "email_queue", properties, body);}
}
注冊服務
builder.Services.Configure<RabbitMqSettings>(builder.Configuration.GetSection("RabbitMQ"));
builder.Services.AddSingleton<RabbitMqConnectionFactory >();
builder.Services.AddTransient<EmailService>();
實體
Model
public class EmailDto
{ /// <summary> /// 郵箱地址 /// </summary> public string Email { get; set; } /// <summary> /// 主題 /// </summary> public string Subject { get; set; } /// <summary> /// 內容 /// </summary> public string Body { get; set; }
}
public class RabbitMqSettings
{ public string Hostname { get; set; } public string Port { get; set; } public string Username { get; set; } public string Password { get; set; }
}
消費者
RabiitMQClient
static void Main(string[] args)
{ var factory = new ConnectionFactory { HostName = "localhost", Port = 5672, UserName = "guest", Password = "guest" }; using var connection = factory.CreateConnection(); using var channel = connection.CreateModel(); channel.QueueDeclare(queue: "email_queue", durable: true,//是否持久化 exclusive: false,//是否排他 autoDelete: false,//是否自動刪除 arguments: null);//參數 //這里可以設置prefetchCount的值,表示一次從隊列中取多少條消息,默認是1,可以根據需要設置 //這里設置了prefetchCount為1,表示每次只取一條消息,然后處理完后再確認收到,這樣可以保證消息的順序性 //global是否全局 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); Console.WriteLine(" [*] 正在等待消息..."); //創建消費者 var consumer = new EventingBasicConsumer(channel); //注冊事件處理方法 consumer.Received += (model, ea) => { byte[] body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); var email = JsonConvert.DeserializeObject<EmailDto>(message); Console.WriteLine(" [x] 發送郵件 {0}", email.Email); //處理完消息后,確認收到 //multiple是否批量確認 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; //開始消費 //queue隊列名 //autoAck是否自動確認,false表示手動確認 //consumer消費者 channel.BasicConsume(queue: "email_queue", autoAck: false, consumer: consumer); Console.WriteLine(" 按任意鍵退出"); Console.ReadLine();
}
一階段測試效果
一階段就是消費者和生產者能正常運行。
可以看到生產者發送郵件之后,消費者能夠正常消費請求。那么開始二階段,將郵件發送代碼完成,并實現能夠通過隊列處理郵件發送。對于郵件發送失敗就簡單的做下處理,相對較好的解決方案就是使用死信隊列,將發送失敗的消息放到死信隊列處理。
簡單的創建一個用于發送郵件的類,這里使用MailKit
庫發送郵件。
public class EmailService
{ private readonly SmtpClient client; public EmailService(SmtpClient client) { this.client = client; } public async Task SendEmailAsync(string from, string to, string subject, string body) {try{await client.ConnectAsync("smtp.163.com", 465, SecureSocketOptions.SslOnConnect); // 認證 await client.AuthenticateAsync("zy1767992919@163.com", ""); // 創建一個郵件消息 var message = new MimeMessage(); message.From.Add(new MailboxAddress("發件人名稱", from)); message.To.Add(new MailboxAddress("收件人名稱", to)); message.Subject = subject; // 設置郵件正文 message.Body = new TextPart("html") { Text = body }; // 發送郵件 var response =await client.SendAsync(message); // 斷開連接 await client.DisconnectAsync(true); }catch (Exception ex){// 斷開連接 await client.DisconnectAsync(true); throw new EmailServiceException("郵件發送失敗", ex); }}
} public class EmailServiceFactory
{ public EmailService CreateEmailService() { var client = new SmtpClient(); return new EmailService(client); }
}
public class EmailServiceException : Exception
{ public EmailServiceException(string message) : base(message) { } public EmailServiceException(string message, Exception innerException) : base(message, innerException) { }
}
接下來我們在消費者中調用郵件發送方法即可,如果不使用死信隊列,我們只需要在事件處理代碼加上郵件發送邏輯就行了。
consumer.Received += async (model, ea) =>
{byte[] body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);var email = JsonConvert.DeserializeObject<EmailDto>(message);// 創建一個EmailServiceFactory實例var emailServiceFactory = new EmailServiceFactory(); // 使用EmailServiceFactory創建一個EmailService實例 var emailService = emailServiceFactory.CreateEmailService(); // 調用EmailService的SendEmailAsync方法來發送電子郵件 string from = "zy1767992919@163.com"; // 發件人地址 string to = email.Email; // 收件人地址 string subject = email.Subject; // 郵件主題 string emailbody = email.Body; // 郵件正文 try { await emailService.SendEmailAsync(from, to, subject, emailbody); Console.WriteLine(" [x] 發送郵件 {0}", email.Email);} catch (Exception ex) { Console.WriteLine(" [x] 發送郵件失敗 " + ex.Message); //這里可以記錄日志//可以使用BasicNack方法,重新回到隊列,重新消費} //處理完消息后,確認收到//multiple是否批量確認channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
在上面中可以將發送失敗的郵件重新放隊列,多試幾次,這里就不做多余的介紹了。
完成效果展示
一封正確的郵件
ok,現在展示郵件發送Demo的完整展示。首先我們來寫一個正確的郵箱地址進行發送:
可以看到當我們發送請求之后,消費者正常消費了這條請求,同時郵件發送服務也正常執行。
多條發送郵件請求
那么接下來,我們通過Api測試工具,一次性發送多條郵件請求。其中包含正確的郵箱地址、錯誤的郵箱地址,看看消費者能不能正常消費呢~這里簡單的發送3條請求,2封正確的郵件地址,一封錯誤的,看看2封正常郵件地址的能不能正常發送出去。
這里有個問題,如果我填的郵件格式是正確的但是這個郵件地址是不存在的,他是能正常發送過去的,然后會被郵箱服務器退回來,這里不知道該怎么判斷是否發送成功。所以我這的錯誤地址是格式就不對的郵件地址,用來模擬因為網絡原因或者其他原因導致的郵件發送不成功。
可以看到3條請求都成功了,并且消費者接收到并正確消費了。2條正確郵件也收到了,1條錯誤的郵件也捕獲到了。
總結
本文通過使用RabiitMQ
點對點模式來完成一個發送郵件的小項目,通過隊列去處理郵件發送。通過RabbitMQ.Client
庫去連接RabbitMQ服務器。使用MailKit
庫發送郵件。通過使用RabbitMQ來避免郵件發送請求時間長的問題,同時能在消費者中重試、記錄發送失敗的郵件,來統一發送、統一處理。不足點就是被退回的郵件不知道該如何處理。可優化點:
-
可以使用
WorkQueues
工作隊列隊列模式將消息分發給多個消費者,適用于消息量較大的情況。 -
可以使用死信隊列處理發送失敗的郵件
文章轉載自:妙妙屋(zy)
原文鏈接:https://www.cnblogs.com/ZYPLJ/p/18279034
體驗地址:引邁 - JNPF快速開發平臺_低代碼開發平臺_零代碼開發平臺_流程設計器_表單引擎_工作流引擎_軟件架構