Memorphosis

Memorphosis

Memorphosis是一个消息中间件,它是linkedin开源MQ——kafkaJava版本,针对淘宝内部应用做了定制和优化。Metamorphosis的设计原则

· 消息都是持久的,保存在磁盘

· 吞吐量第一

· 消费状态保存在客户端

· 分布式,生产者、服务器和消费者都可分布



Metamorphosis的特点

除了完整实现kafka的功能之外,我们还为meta加入了额外的功能,使得meta成为一个更为强大的通用消息中间件,包括

· 彻底用java重写的实现,高效的协议和通讯框架

· 发送端的负载均衡

· Master/Slave异步和同步复制的高可用方案

· 专门用于广播消息的客户端实现

· diamond结合使用的顺序发送消息功能

· 支持事务,包括本地事务和分布式事务,实现JTA规范。



Getting started

我们在日常已经部署了metamorhposis环境,因此你可以直接在本地测试,如果你想部署一个自己的服务器,可以参照#服务器部署节。

前面提到,meta是一个消息中间件。消息中间件中有两个角色:消息生产者和消息消费者。Meta里同样有这两个概念,消息生产者负责创建消息并发送到meta服务器,meta服务器会将消息持久化到磁盘,消息消费者从meta服务器拉取消息并提交给应用消费。


消息会话工厂类

在使用消息生产者和消费者之前,我们需要创建它们,这就需要用到消息会话工厂类——MessageSessionFactory,由这个工厂帮你创建生产者或者消费者。除了这些,MessageSessionFactory还默默无闻地在后面帮你做很多事情,包括

1. 服务的查找和发现,通过diamond和zookeeper帮你查找日常的meta服务器地址列表

2. 连接的创建和销毁,自动创建和销毁到meta服务器的连接,并做连接复用,也就是到同一台meta的服务器在一个工厂内只维持一个连接。

3. 消息消费者的消息存储和恢复,后续我们会谈到这一点。

4. 协调和管理各种资源,包括创建的生产者和消费者的。

因此,我们首先需要创建一个会话工厂类,MessageSessionFactory仅是一个接口,它的实现类是MetaMessageSessionFactory

MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(new MetaClientConfig());

请注意,MessageSessionFactory应当全局共用一个


消息生产者

翠花,上代码

public class Producer {
    public static void main(String[] args) throws Exception {
        // New session factory,强烈建议使用单例
        MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(new MetaClientConfig());
        // create producer,强烈建议使用单例
        MessageProducer producer = sessionFactory.createProducer();
        // publish topic
        final String topic = "meta-test";
        producer.publish(topic);

 

        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        String line = null;
        while ((line = reader.readLine()) != null) {
            // send message
            SendResult sendResult = producer.sendMessage(new Message(topic, line.getBytes()));
            // check result
            if (!sendResult.isSuccess()) {
                System.err.println("Send message failed,error message:" + sendResult.getErrorMessage());
            }
            else {
                System.out.println("Send message successfully,sent to " + sendResult.getPartition());
            }
        }
    }

 

}


MessageProducerMessageSessionFactory的createProducer方法即可以创建一个生产者。在Meta里,每个消息对象都是Message类的实例,Message表示一个消息对象,它包含这么几个属性:

属性

id

消息的唯一id,系统自动产生,用户无法设置,在发送成功后由服务器返回,发送失败则为0。

topic

消息的主题,订阅者订阅该主题即可接收发送到该主题下的消息,必须

data

消息的有效载荷,也就是消息内容,meta永远不会修改消息内容,你发送出去是什么样子,接收到就是什么样子。消息内容限制在1M以内,我的建议是最好不要发送超过上百K的消息,必须

attribute

消息属性,一个字符串,可选。发送者可设置消息属性来让消费者过滤。

之前还调用了MessageProducerpublish(topic)方法

producer.publish(topic);

这一步在发送消息前是必须的,你必须发布你将要发送消息的topic,这是为了让会话工厂帮你去查找接收这些topic的meta服务器地址并初始化连接。这个步骤针对每个topic只需要做一次,多次调用无影响。

总结下这个例子,从标准输入读入你输入的数据,并将数据封装成一个Message对象,发送到topic为meta-test下。

请注意,MessageProducer是线程安全的,完全可重复使用,因此最好在应用中作为单例来使用,一次创建,到处使用,配置为spring里的singleton bean。MessageProducer创建的代价昂贵,每次都需要通过zk查找服务器并创建tcp长连接。



[] 消息消费者

发送消息后,消费者可以接收消息了,下面的代码创建消费者并订阅meta-test这个主题,等待消息送达并打印消息内容;

public class AsyncConsumer {
    public static void main(String[] args) throws Exception {
        // New session factory,强烈建议使用单例
        MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(new MetaClientConfig());
        // subscribed topic
        final String topic = "meta-test";
        // consumer group
        final String group = "meta-example";
        // create consumer,强烈建议使用单例
        MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfig(group));
        // subscribe topic
        consumer.subscribe(topic, 1024 * 1024, new MessageListener() {

 

            public void recieveMessages(Message message) {
                System.out.println("Receive message " + new String(message.getData()));
            }

 

 

            public Executor getExecutor() {
                // Thread pool to process messages,maybe null.
                return null;
            }
        });
        // complete subscribe
        consumer.completeSubscribe();

 

    }

 

}

通过createConsumer方法来创建MessageConsumer,注意到我们传入一个ConsumerConfig参数,这是消费者的配置对象。每个消息者都必须有一个ConsumerConfig配置对象,我们这里只设置了group属性,这是消费者的分组名称。Meta的Producer、Consumer和Broker都可以为集群。消费者可以组成一个集群共同消费同一个topic,发往这个topic的消息将按照一定的负载均衡规则发送给集群里的一台机器。同一个消费者集群必须拥有同一个分组名称,也就是同一个group,这个概念跟notify里的订阅者组名是一样的。我们这里将分组名称设置为meta-example。

订阅消息通过subscribe方法,这个方法接受三个参数

· topic,订阅的主题

· maxSize,因为meta是一个消费者主动拉取的模型,这个参数规定每次拉取的最大数据量,单位为字节,这里设置为1M,最大为1M。

· MessageListener,消息监听器,负责消息消息。

MessageListener的接口方法如下

public interface MessageListener {
    /**
     * 接收到消息列表,只有messages不为空并且不为null的情况下会触发此方法
     *
     * @param messages
     */
    public void recieveMessages(Message message);

 

 

    /**
     * 处理消息的线程池
     *
     * @return
     */
    public Executor getExecutor();
}



消息的消费过程可以是一个并发处理的过程,getExecutor返回你想设置的线程池,每次消费都会在这个线程池里进行。recieveMessage方法用于实际的消息消费处理,message参数即为消费者收到的消息,它必不为null。

我们这里简单地打印收到的消息内容就完成消费。如果在消费过程中抛出任何异常,该条消息将会在一定间隔后重新尝试提交给MessageListener消费。在多次消费失败的情况下,该消息将会存储到消费者应用的本次磁盘,并在后台自动恢复重试消费。

细心的你一定还注意到,在调用subscribe之后,我们还调用了completeSubscribe方法来完成订阅过程。请注意,subscribe仅是将订阅信息保存在本地,并没有实际跟meta服务器交互,要使得订阅关系生效必须调用一次completeSubscribe,completeSubscribe仅能被调用一次,多次调用将抛出异常 为什么需要completeSubscribe方法呢,原因有二:

· 首先,subscribe方法可以被调用多次,也就是一个消费者可以消费多种topic

· 其次,如果每次调用subscribe都跟zk和meta服务器交互一次,代价太高

因此completeSubscribe一次性将所有订阅的topic生效,并处理跟zk和meta服务器交互的所有过程。

同样,MessageConsumer也是线程安全的,创建的代价不低,因此也应该尽量复用。

小结

上面的例子可以直接在您的机器上跑起来不过建议测试的时候使用自己的topic和消费者组名group,防止跟其他测试的开发者产生冲突

  • 发表于 2017-11-20 11:23
  • 阅读 ( 780 )
  • 分类:Hadoop

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
不写代码的码农
- Nightmare

33 篇文章

作家榜 »

  1. 威猛的小站长 116 文章
  2. Jonny 65 文章
  3. 江南烟雨 33 文章
  4. - Nightmare 33 文章
  5. doublechina 31 文章
  6. HJ社区-肖峰 29 文章
  7. 伪摄影 20 文章
  8. Alan 14 文章