加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

使用JAVA实现RabbitMQ的发送告警

发布时间:2020-12-15 03:14:42 所属栏目:Java 来源:网络整理
导读:今天PHP站长网 52php.cn把收集自互联网的代码分享给大家,仅供参考。 基于rabbitmq实战4.2章节发送即忘模型,书上是使用PiPa库完成,这里使用java实现 package com.rayootech.rabbitmq.demo.inaction.alertwarning;import

以下代码由PHP站长网 52php.cn收集自互联网

现在PHP站长网小编把它分享给大家,仅供参考

基于rabbitmq实战4.2章节发送即忘模型,书上是使用PiPa库完成,这里使用java实现
package com.rayootech.rabbitmq.demo.inaction.alertwarning;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class AlertWarningProducer {
    private static final String EXCHANGE_NAME = "alerts";
    private static final String ROUTING_KEY = "critical.alert";
    public static void main(String[] argv) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("rabbitmq128");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);

            connection = factory.newConnection();
            channel = connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME,"topic",true);

            String message = "critical content!!!";

            channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,null,message.getBytes());
            System.out.println(" [x] Sent '" + ROUTING_KEY + "':'" + message + "'");

        }
        catch  (Exception e) {
            e.printStackTrace();
        }
        finally {
            if (connection != null) {
                try {
                    connection.close();
                }
                catch (Exception ignore) {}
            }
        }
    }
}
import com.rabbitmq.client.*;

import javax.mail.*;
import javax.mail.Message.RecipientType;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeoutException;


public class AlertWarningConsumer {
    private  final static  String EMAIL_RECIPIENTS="********@163.com";//接收者邮箱
    private  final static  String EMAIL_SENDER="********@163.com";//发送者邮箱
    private  final static  String EXCHANGE="alerts";//topic类型交换器
    private  final static  String TYPE="topic";
    private  final static  String QUEUE1="critical";
    private  final static  String QUEUE2="rate_limt";
    private  final static  String ROUTING_KEY1="critical.*";
    private  final static  String ROUTING_KEY2="*.rate_limt";
    /**
     *
     * @param recipients    接收人
     * @param subject       发送主题
     * @param msg       发送消息内容
     * @throws IOException
     * @throws MessagingException
     */
    public static void sendEmail(String recipients,String subject,Object msg) throws IOException,MessagingException {
        final Properties props = new Properties();
        /*
         * 可用的属性: mail.store.protocol / mail.transport.protocol / mail.host /
         * mail.user / mail.from
         */
        // 表示SMTP发送邮件,需要进行身份验证
        props.put("mail.smtp.auth","true");
        props.put("mail.smtp.host","smtp.163.com");
        // 发件人的账号
        props.put("mail.user",EMAIL_SENDER);
        // 访问SMTP服务时需要提供的密码
        props.put("mail.password","*******");

        // 构建授权信息,用于进行SMTP进行身份验证
        Authenticator authenticator = new Authenticator() {
            @Override
            protected PasswordAuthentication getPasswordAuthentication() {
                // 用户名、密码
                String userName = props.getProperty("mail.user");
                String password = props.getProperty("mail.password");
                return new PasswordAuthentication(userName,password);
            }
        };
        // 使用环境属性和授权信息,创建邮件会话
        Session mailSession = Session.getInstance(props,authenticator);
        // 创建邮件消息
        MimeMessage message = new MimeMessage(mailSession);
        // 设置发件人
        InternetAddress form = new InternetAddress(
                props.getProperty("mail.user"));
        message.setFrom(form);

        // 设置收件人
        InternetAddress to = new InternetAddress(recipients);
        message.setRecipient(RecipientType.TO,to);

        // 设置邮件标题
        message.setSubject(subject);
        // 设置邮件的内容体{"message":"告警消息邮件发送"}
        message.setContent(msg,"application/json;charset=UTF-8");
        // 发送邮件
        Transport.send(message);
    }
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = null;

        try {

            factory.setPort(5672);
            factory.setHost("rabbitmq128");
            factory.setUsername("admin");
            factory.setPassword("admin");
            connection = factory.newConnection();
            //创建连接
            final Channel channel = connection.createChannel();
            //声明交换器队列绑定等信息
            channel.exchangeDeclare(EXCHANGE,TYPE,true);
            channel.queueDeclare(QUEUE1,false,null);
            channel.queueBind(QUEUE1,EXCHANGE,ROUTING_KEY1);

            channel.queueDeclare(QUEUE2,null);
            channel.queueBind(QUEUE2,ROUTING_KEY2);

            Consumer rate_limit_notify = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {
                    String subject = "rate_limit Alert";
                    String msg = new String(body,"UTF-8");
                    try {
                        sendEmail(EMAIL_RECIPIENTS,subject,msg);
                    } catch (MessagingException e) {
                        e.printStackTrace();
                    }
                    System.out.println("send alert E-mail!Alert text:Recipients: " + EMAIL_RECIPIENTS+" subject: "+subject);
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };

            Consumer critical_notify = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag,byte[] body) throws IOException {
                    String subject = "Critical Alert";
                    String msg = new String(body,false);
                }
            };
            //消息消费
            channel.basicConsume(QUEUE1,"critical",critical_notify);
            channel.basicConsume(QUEUE2,"rate_limit",rate_limit_notify);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

以上内容由PHP站长网【52php.cn】收集整理供大家参考研究

如果以上内容对您有帮助,欢迎收藏、点赞、推荐、分享。

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读