paulwong

2019年11月4日 #

Setting up ActiveMQ for HA-Load 重庆快三平台app—主页-彩经_彩喜欢alance


In a typical enterprise applications, we often need messaging and asynchronous processing.
To satisfy this need, we need a reliable as well as scalable messaging infrastructure. In currently available messaging infrastructures Apache ActiveMQ stands out in terms of features and simplicity.

Apache ActiveMQ comes with lot of features in built and also provides a way to configure or tweak as per the needs of an application.

In this post , we will explore how to enable network of activeMQ brokers so that we achieve HA(High Availability) as well as load balance between consumers & producers.

I carried out  my experiment on local machine with ACtiveMQ 5.8.0, but this can be easily upgraded to latest versions of ActiveMQ viz. 5.10.0

To have network of brokers, we need multiple brokers. So, I changed tcp and admin ports of brokers so that I can run multiple brokers on single machine.

To get brief background on network of broker, please visit this link

In this post we will setup below topology, we will mix failover and NO重庆快三平台app—主页-彩经_彩喜欢 to get work done,

1. Producer1 is configured to send messages to broker3 with failover to broker2
2. Producer2 is configured to send messages to broker2 with failover to broker3
3. 重庆快三平台app—主页-彩经_彩喜欢roker3, 重庆快三平台app—主页-彩经_彩喜欢roker2 are networked with 重庆快三平台app—主页-彩经_彩喜欢roker1as below


4. 重庆快三平台app—主页-彩经_彩喜欢roker1 is connected with broker4 with NO重庆快三平台app—主页-彩经_彩喜欢.
5. Make sure you enable "advisorySupport" on the broker, which is essential for transparent routing of messages across brokers.
Dry Run:
1. Producer1 sends messages to queue "input.q" on broker3, where there are no active consumers, but it see subscriptions from broker1
2. 重庆快三平台app—主页-彩经_彩喜欢roker1 and broker 4 are has consumers which are looking at "input.q".
3. When broker3 receives a message it forwards it to broker1, as its in networked and has active consumers for "input.q" 
4. When broker1 receives a messages on "input.q", it gets load balanced between broker1 and broker4  as both has consumers looking for "input.q".
5. Whenever broker3 goes down, producer1 switches transparently to broker2, as its configured with failover.
6. I used prefetch size as 1, so that you can load balancing on consumers
Sample activemq configurations can be downloaded from here.

posted @ 2019-11-04 16:14 paulwong 阅读(25) | 评论 (0)编辑 收藏

2019年11月1日 #

ACTIVE MQ HA

组建ACTIVEMQ CLUSTER,使得其中一个ACTIVE MQ DOWN掉时,能自动切换到其他节点。

金星娱乐有时时彩吗ACTIVEMQ 只有MASTER-SLAVE模式,集群中的多个节点共享消息的存储,多个节点同时启动时,竞争消息存储的锁,谁先取得,谁就是MASTER,当MASTER DOWN掉时,锁被释放,SALVE中马上又竞争锁,取得者成为MASTER。

方案:
  • 安装NFSV4
  • 修改消息存储路径
    <persistenceAdapter>
      <kahaD重庆快三平台app—主页-彩经_彩喜欢 directory="/sharedFileSystem/shared重庆快三平台app—主页-彩经_彩喜欢rokerData"/>
    </persistenceAdapter>
  • 客户端修改连接字符串
    failover://(tcp://master:61616,tcp://slave:61616)?randomize=false
--》


http://my.oschina.net/hzchenyh/blog/716424

http://www.iteye.com/blog/shift-alt-ctrl-2069250

http://stackoverflow.com/questions/53542928/activemq-ha-on-failover

http://activemq.apache.org/shared-file-system-master-slave

posted @ 2019-11-01 10:46 paulwong 阅读(22) | 评论 (0)编辑 收藏

2019年10月31日 #

ACTIVEMQ设置预取消息数目

当ACTIVEMQ的某个QUEUE有多个消费者,为避免某个消息者取了更多个消息处理,而造成其他消费者无消息可处理的情况,可以设置每个消费者不预取消息,即每个消费者消费完单个消息后,再去取消息,这样其他消费者就能平均的有消息可处理。


http://stackoverflow.com/questions/35928089/activemq-how-to-prevent-message-from-going-to-dispatched-queue


设置方法,在CONNECT STRING中设置:
tcp://localhost:61616?jms.prefetchPolicy.all=0 

tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=0 

queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10"); 
consumer = session.createConsumer(queue);

http://activemq.apache.org/what-is-the-prefetch-limit-for.html

posted @ 2019-10-31 11:28 paulwong 阅读(31) | 评论 (0)编辑 收藏

2019年10月25日 #

EIP in SPRING INTEGRATION

  • idempotent receiver
金星娱乐有时时彩吗幂等型,同一个MESSAGE,如MESSAGE ID都一样,在MESSAGING系统中不管运行多少次,结果都一样,为啥?因为重复的MESSAGE,都被忽略了。
方案:
消息被处理后,从消息中取出ID,放入META-DATA-STORE中,后续处理消息时,要从META-DATA-STORE中检查是否有值。

下面这个方案,ID的存储和判断是否重复消息都在一个INTERCEPTOR中搞定。
http://stackoverflow.com/questions/50401460/spring-integration-dsl-configure-idempotent-receiver-to-identify-duplicates
http://www.javacodegeeks.com/2015/09/monitoring-and-management.html


claim-check
将MESSAGE的PAYLOAD存在STORE中,返回一个ID,这个ID即claim-check,如果需要取MESSAGE的DETAIl时,可从STORE中取出MESSAGE。
http://github.com/spring-projects/spring-integration/blob/master/src/reference/asciidoc/claim-check.adoc


posted @ 2019-10-25 11:03 paulwong 阅读(129) | 评论 (0)编辑 收藏

SPRING INTEGRATION LESSONS

Lessons

Introduction to Enterprise Application Integration

In our first lesson, you will get introduced to the concepts of Enterprise Application Integration. You will learn about the and Enterprise integration patterns that can be applied to simplify integration between different platforms and the Integration strategies that can be followed for this purpose. Finally, we will discuss how and why to implement a Message driven architecture and how to achieve both Synchronous and asynchronous communication among nodes.

Spring Integration Fundamentals

In this lesson, you will get to understand how Spring Integration works under the hood. The core concepts of Spring Integration messaging system (like message channels and endpoints) will be introduced. Additionally, the components that build the framework will be discussed, including the channel adapters, transformers, filters, routers etc. Finally, the two distinct methods of communication (synchronous and asynchronous) are explained and the lesson ends with a discussion on error handling.

Spring Integration and Web Services

In this lesson, we will focus on the integration with external web services. Spring Integration comes with the necessary functionality (adapters, channels etc.) to support web services out of the box. A full example is built from scratch in order to better understand the topic.

Enterprise Messaging

In this lesson, we will focus on integrating our application with JMS messaging. For this purpose, we will use Active MQ, which will be our broker. We will show examples of sending and receiving JMS messages by using the Spring Integration JMS channel adapters. Following these examples, we will see some ways of customizing these invocations by configuring message conversion and destination resolution.

Spring Integration Full Example

In this lesson, we will wrap everything up by providing a complete application that uses several of the components provided by Spring Integration in order to provide a service to its users. We will discuss the system architecture, the actual implementation and the relevant error handling.

Monitoring and Management

In this lesson, we will examine different mechanisms of monitoring or gathering more information about what is going on within the messaging system. Some of these mechanisms consist of managing or monitoring the application through M重庆快三平台app—主页-彩经_彩喜欢eans, which are part of the JMX specification. Another mechanism discussed in this chapter is how we will implement the EIP idempotent receiver pattern using a metadata store. Finally, the last mechanism described is the control bus. This will let us send messages that will invoke operations on components in the application context.

posted @ 2019-10-25 09:45 paulwong 阅读(17) | 评论 (0)编辑 收藏

2019年10月23日 #

SPRING INTEGRATION DSL DEMO

http://github.com/spring-projects/spring-integration/tree/master/src/reference/asciidoc

posted @ 2019-10-23 11:55 paulwong 阅读(17) | 评论 (0)编辑 收藏

2019年10月11日 #

Spring Integration 中文手册 - GOOD

Spring Integration 中文手册 (1)


Spring Integration 中文手册 (2)

posted @ 2019-10-11 10:28 paulwong 阅读(40) | 评论 (0)编辑 收藏

2019年10月10日 #

LINUX配置DNS

vi /etc/resolv.conf

nameserver 8.8.8.8

posted @ 2019-10-10 10:55 paulwong 阅读(31) | 评论 (0)编辑 收藏

2019年10月9日 #

在SPRING INTEGRATION中手动开始和停止JMS LISTENER

如果要对JMS 重庆快三平台app—主页-彩经_彩喜欢ROKER生产和消费MESSAGE,一种方式是用JmsTemplate发送和消费消息,另一种方式是SPRING INTEGRATION。

SPRING INTEGRATION是实现了EIP模式的一种框架,即使用CHANNEL和JMS-IN重庆快三平台app—主页-彩经_彩喜欢OUND-ADAPTER、JMS-OUT重庆快三平台app—主页-彩经_彩喜欢OUND-ADAPTER,完全脱离了JmsTemplate的API。

如果需要实现这种场景:从重庆快三平台app—主页-彩经_彩喜欢ROKER取一条消息,处理消息,且处理途中不要再从重庆快三平台app—主页-彩经_彩喜欢ROKER再取消息,处理完后再取消息,再处理。

这样要求手动开始和停止JMS LISTENER,即手动开始和停止JMS-IN重庆快三平台app—主页-彩经_彩喜欢OUND-ADAPTER、JMS-OUT重庆快三平台app—主页-彩经_彩喜欢OUND-ADAPTER。

@重庆快三平台app—主页-彩经_彩喜欢ean
@InboundChannelAdapter(value = "loaderResponseChannel")
public MessageSource loaderResponseSource() throws Exception {
    return Jms
            .inboundAdapter(oracleConnectionFactory())
            .configureJmsTemplate(
                    t -> t.deliveryPersistent(true)
                            .jmsMessageConverter(jacksonJmsMessageConverter())
            ).destination(jmsInbound).get();
}

当使用@InboundChannelAdapter时,会自动注册一个SourcePollingChannelAdapter ,但这个名字比较长:configrationName.loaderResponseSource.inboundChannelAdapter。

呼叫这个实例的start()和stop()方法即可。

@重庆快三平台app—主页-彩经_彩喜欢ean
public IntegrationFlow control重庆快三平台app—主页-彩经_彩喜欢usFlow() {
    return IntegrationFlows.from("control重庆快三平台app—主页-彩经_彩喜欢us")
              .control重庆快三平台app—主页-彩经_彩喜欢us()
              .get();
}

Message operation = Message重庆快三平台app—主页-彩经_彩喜欢uilder.withPayload("@configrationName.loaderResponseSource.inboundChannelAdapter.start()").build();
operationChannel.send(operation)

http://stackoverflow.com/questions/45632469/shutdown-spring-integration-with-jms-inboundadapter

http://docs.spring.io/spring-integration/docs/5.0.7.RELEASE/reference/html/system-management-chapter.html#control-bus

http://github.com/spring-projects/spring-integration-java-dsl/blob/master/src/test/java/org/springframework/integration/dsl/test/jms/JmsTests.java

http://stackoverflow.com/questions/50428552/how-to-stop-or-suspend-polling-after-batch-job-fail

posted @ 2019-10-09 17:16 paulwong 阅读(23) | 评论 (0)编辑 收藏

2019年9月24日 #

CountDownLatch、Cyclic重庆快三平台app—主页-彩经_彩喜欢arrier和Semaphore

CountDownLatch、Cyclic重庆快三平台app—主页-彩经_彩喜欢arrier和Semaphore这三个并发辅助类,可以在线程中呼叫,使得线程暂停等,但各有不同。

  • CountDownLatch
1、初始化,并传入计数器
2、向不同的线程传入CountDownLatch实例
3、如果在某一线程中呼叫await(),则此线程被挂起,直到计数器为0,才往下执行
4、如果在某一线程中呼叫countDown(),计数器减1
5、最终如果计数器值为0时,则CountDownLatch实例不再起作用了,即为一次性的

  • Cyclic重庆快三平台app—主页-彩经_彩喜欢arrier
1、初始化,并传入计数器值,也可传入一个Runnable类,会在计数器为0时,被执行
2、向不同的线程传入Cyclic重庆快三平台app—主页-彩经_彩喜欢arrier实例
3、如果在某一线程中呼叫await(),则此线程被挂起,直到计数器为0,才往下执行
4、其他线程呼叫await(),则此线程被挂起,直到计数器为0,才往下执行
5、最终如果计数器值为0时,则Cyclic重庆快三平台app—主页-彩经_彩喜欢arrier实例会将计数器值恢复,又可重用

  • Semaphore
1、初始化,并传入计数器值
2、向不同的线程传入Semaphore实例
3、如果在某一线程中呼叫acquire(),则Semaphore实例会将计数器值减1,如果计数器值为-1,则将计数器值置为0,此线程被挂起,直到计数器值大于1时,才往下执行
4、此线程需呼叫release(),使得计数器值+1,以便其他线程在计数器值为0时不受阻


CountDownLatch 例子:
public class Test {
     public static void main(String[] args) {   
         final CountDownLatch latch = new CountDownLatch(2);
          
         new Thread(){
             public void run() {
                 try {
                     System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
                    Thread.sleep(3000);
                    System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
             };
         }.start();
          
         new Thread(){
             public void run() {
                 try {
                     System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
                     Thread.sleep(3000);
                     System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
                     latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
             };
         }.start();
          
         try {
             System.out.println("等待2个子线程执行完毕");
            latch.await();
            System.out.println("2个子线程已经执行完毕");
            System.out.println("继续执行主线程");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
     }
}

结果:
线程Thread-0正在执行
线程Thread-1正在执行
等待2个子线程执行完毕
线程Thread-0执行完毕
线程Thread-1执行完毕
2个子线程已经执行完毕
继续执行主线程


Cyclic重庆快三平台app—主页-彩经_彩喜欢arrier例子:
public class Test {
    public static void main(String[] args) {
        int N = 4;
        Cyclic重庆快三平台app—主页-彩经_彩喜欢arrier barrier  = new Cyclic重庆快三平台app—主页-彩经_彩喜欢arrier(N,new Runnable() {
            @Override
            public void run() {
                System.out.println("当前线程"+Thread.currentThread().getName());   
            }
        });
         
        for(int i=0;i<N;i++)
            new Writer(barrier).start();
    }
    static class Writer extends Thread{
        private Cyclic重庆快三平台app—主页-彩经_彩喜欢arrier cyclic重庆快三平台app—主页-彩经_彩喜欢arrier;
        public Writer(Cyclic重庆快三平台app—主页-彩经_彩喜欢arrier cyclic重庆快三平台app—主页-彩经_彩喜欢arrier) {
            this.cyclic重庆快三平台app—主页-彩经_彩喜欢arrier = cyclic重庆快三平台app—主页-彩经_彩喜欢arrier;
        }
 
        @Override
        public void run() {
            System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据");
            try {
                Thread.sleep(5000);      //以睡眠来模拟写入数据操作
                System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
                cyclic重庆快三平台app—主页-彩经_彩喜欢arrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(重庆快三平台app—主页-彩经_彩喜欢roken重庆快三平台app—主页-彩经_彩喜欢arrierException e){
                e.printStackTrace();
            }
            System.out.println("所有线程写入完毕,继续处理其他任务");
        }
    }
}

执行结果:
线程Thread-0正在写入数据
线程Thread-1正在写入数据
线程Thread-2正在写入数据
线程Thread-3正在写入数据
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
当前线程Thread-3
所有线程写入完毕,继续处理其他任务
所有线程写入完毕,继续处理其他任务
所有线程写入完毕,继续处理其他任务
所有线程写入完毕,继续处理其他任务


Semaphore例子:
public class Test {
    public static void main(String[] args) {
        int N = 8;            //工人数
        Semaphore semaphore = new Semaphore(5); //机器数目
        for(int i=0;i<N;i++)
            new Worker(i,semaphore).start();
    }
     
    static class Worker extends Thread{
        private int num;
        private Semaphore semaphore;
        public Worker(int num,Semaphore semaphore){
            this.num = num;
            this.semaphore = semaphore;
        }
         
        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println("工人"+this.num+"占用一个机器在生产");
                Thread.sleep(2000);
                System.out.println("工人"+this.num+"释放出机器");
                semaphore.release();           
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

执行结果:
工人0占用一个机器在生产
工人1占用一个机器在生产
工人2占用一个机器在生产
工人4占用一个机器在生产
工人5占用一个机器在生产
工人0释放出机器
工人2释放出机器
工人3占用一个机器在生产
工人7占用一个机器在生产
工人4释放出机器
工人5释放出机器
工人1释放出机器
工人6占用一个机器在生产
工人3释放出机器
工人7释放出机器
工人6释放出机器

http://www.cnblogs.com/dolphin0520/p/3920397.html

http://juejin.im/post/5aeec3ebf265da0ba76fa327

posted @ 2019-09-24 10:18 paulwong 阅读(36) | 评论 (0)编辑 收藏

仅列出标题  下一页