C#中关于ActiveMQ的应用详解

activemq是个好东东,不必多说。activemq提供多种语言支持,如java, c, c++, c#, ruby, perl, python, php等。由于我在windows下开发gui,比较关心c++和c#,其中c#的activemq很简单,apache提供nms(.net messaging service)支持.net开发,只需如下几个步骤即能建立简单的实现。c++的应用相对麻烦些,稍后写文章介绍。

1、去ActiveMQ官方网站下载最新版的ActiveMQ,我之前下的是5.3.1,5.3.2现在也已经出来了。

2、去ActiveMQ官方网站下载最新版的Apache.NMS,需要下载Apache.NMS和Apache.NMS.ActiveMQ两个bin包,如果对源码感兴趣,也可下载src包。这里要提醒一下,如果下载1.2.0版本的NMS.ActiveMQ,Apache.NMS.ActiveMQ.dll在实际使用中有个bug,即停止ActiveMQ应用时会抛WaitOne函数异常,查看src包中的源码发现是由于Apache.NMS.ActiveMQ-1.2.0-srcsrcmaincsharpTransportInactivityMonitor.cs中的如下代码造成的,修改一下源码重新编译即可。看了一下最新版1.3.0已经修复了这个bug,因此下载最新版即可。

private void StopMonitorThreads()           {               lock(monitor)               {                   if(monitorStarted.CompareAndSet(true, false))                   {                       AutoResetEvent shutdownEvent = new AutoResetEvent(false);                       // Attempt to wait for the Timers to shutdown, but don't wait                       // forever, if they don't shutdown after two seconds, just quit.                       this.readCheckTimer.Dispose(shutdownEvent);                       shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));                       this.writeCheckTimer.Dispose(shutdownEvent);                       shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));                                                       //WaitOne的定义:public virtual bool WaitOne(TimeSpan timeout,bool exitContext)                       this.asyncTasks.Shutdown();                       this.asyncTasks = null;                       this.asyncWriteTask = null;                       this.asyncErrorTask = null;                   }               }           }       private void StopMonitorThreads()         {             lock(monitor)             {                 if(monitorStarted.CompareAndSet(true, false))                 {                     AutoResetEvent shutdownEvent = new AutoResetEvent(false);                    // Attempt to wait for the Timers to shutdown, but don't wait                     // forever, if they don't shutdown after two seconds, just quit.                     this.readCheckTimer.Dispose(shutdownEvent);                     shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));                     this.writeCheckTimer.Dispose(shutdownEvent);                     shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));                                                     //WaitOne的定义:public virtual bool WaitOne(TimeSpan timeout,bool exitContext)                     this.asyncTasks.Shutdown();                     this.asyncTasks = null;                     this.asyncWriteTask = null;                     this.asyncErrorTask = null;                 }             }         }

登录后复制

3、运行ActiveMQ,找到ActiveMQ解压后的bin文件夹:…pache-activemq-5.3.1in,执行activemq.bat批处理文件即可启动ActiveMQ服务器,默认端口为61616,这可在配置文件中修改。

4、写C#程序实现ActiveMQ的简单应用。新建C#工程(一个Producter项目和一个Consumer项目),WinForm或Console程序均可,这里建的是Console工程,添加对Apache.NMS.dll和Apache.NMS.ActiveMQ.dll的引用,然后即可编写实现代码了,简单的Producer和Consumer实现代码如下:

producer:

using System;   using System.Collections.Generic;   using System.Text;   using Apache.NMS;   using Apache.NMS.ActiveMQ;   using System.IO;   using System.Xml.Serialization;   using System.Runtime.Serialization.Formatters.Binary;   namespace Publish   {       class Program       {           static void Main(string[] args)           {               try              {                   //Create the Connection Factory                   IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");                   using (IConnection connection = factory.CreateConnection())                   {                       //Create the Session                       using (ISession session = connection.CreateSession())                       {                           //Create the Producer for the topic/queue                           IMessageProducer prod = session.CreateProducer(                               new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"));                           //Send Messages                           int i = 0;                           while (!Console.KeyAvailable)                           {                               ITextMessage msg = prod.CreateTextMessage();                               msg.Text = i.ToString();                               Console.WriteLine("Sending: " + i.ToString());                               prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);                               System.Threading.Thread.Sleep(5000);                               i++;                           }                       }                   }                   Console.ReadLine();              }               catch (System.Exception e)               {                   Console.WriteLine("{0}",e.Message);                   Console.ReadLine();               }           }       }   }

登录后复制

consumer:

using System;   using System.Collections.Generic;   using System.Text;   using Apache.NMS;   using Apache.NMS.ActiveMQ;   using System.IO;   using System.Xml.Serialization;   using System.Runtime.Serialization.Formatters.Binary;   namespace Subscribe   {       class Program       {           static void Main(string[] args)           {               try              {                   //Create the Connection factory                   IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");                   //Create the connection                   using (IConnection connection = factory.CreateConnection())                   {                       connection.ClientId = "testing listener";                       connection.Start();                       //Create the Session                       using (ISession session = connection.CreateSession())                       {                           //Create the Consumer                           IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"), "testing listener", null, false);                           consumer.Listener += new MessageListener(consumer_Listener);                           Console.ReadLine();                       }                       connection.Stop();                       connection.Close();                   }               }               catch (System.Exception e)               {                   Console.WriteLine(e.Message);               }           }           static void consumer_Listener(IMessage message)           {               try              {                   ITextMessage msg = (ITextMessage)message;                   Console.WriteLine("Receive: " + msg.Text);              }               catch (System.Exception e)               {                   Console.WriteLine(e.Message);               }           }       }   }

登录后复制

程序实现的功能:生产者producer建立名为testing的主题,并每隔5秒向该主题发送消息,消费者consumer订阅了testing主题,因此只要生产者发送testing主题的消息到ActiveMQ服务器,服务器就将该消息发送给订阅了testing主题的消费者。

编译生成producer.exe和consumer.exe,并执行两个exe,即可看到消息的发送与接收了。

这个例子是建的主题(Topic),ActiveMQ还支持另一种方式:Queue,即P2P,两者有什么区别呢?区别在于,Topic是广播,即如果某个Topic被多个消费者订阅,那么只要有消息到达服务器,服务器就将该消息发给全部的消费者;而Queue是点到点,即一个消息只能发给一个消费者,如果某个Queue被多个消费者订阅,没有特殊情况的话消息会一个一个地轮流发给不同的消费者,比如:

msg1–>consumer A

msg2–>consumer B

msg3–>consumer C

msg4–>consumer A

msg5–>consumer B

msg6–>consumer C

特殊情况是指:ActiveMQ支持过滤机制,即生产者可以设置消息的属性(Properties),该属性与消费者端的Selector对应,只有消费者设置的selector与消息的Properties匹配,消息才会发给该消费者。Topic和Queue都支持Selector。

Properties和Selector该如何设置呢?请看如下代码:

producer:

public void SetProperties(){ITextMessage msg = prod.CreateTextMessage();                               msg.Text = i.ToString();                               msg.Properties.SetString("myFilter", "test1");                               Console.WriteLine("Sending: " + i.ToString());                               prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);  ITextMessage msg = prod.CreateTextMessage();                             msg.Text = i.ToString();                             msg.Properties.SetString("myFilter", "test1");                             Console.WriteLine("Sending: " + i.ToString());                             prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);}

登录后复制

consumer:

public void SetSelector(){//生成consumer时通过参数设置Selector   IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("testing"), "myFilter='test1'");  //生成consumer时通过参数设置Selector IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("testing"), "myFilter='test1'");}

登录后复制

以上就是C#中关于ActiveMQ的应用详解的详细内容,更多请关注【创想鸟】其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至253000106@qq.com举报,一经查实,本站将立刻删除。

发布者:PHP中文网,转转请注明出处:https://www.chuangxiangniao.com/p/2493360.html

(0)
上一篇 2025年3月5日 01:30:21
下一篇 2025年2月25日 22:43:46

AD推荐 黄金广告位招租... 更多推荐

相关推荐

  • .net中关于异步性能测试的示例代码

    很久没有写博客了,今年做的产品公司这两天刚刚开了发布会,稍微清闲下来,想想我们做的产品还有没有性能优化空间,于是想到了.net的异步可以优化性能,但到底能够提升多大的比例呢?恰好有一个朋友正在做各种语言的异步性能测试(有关异步和同步的问题,…

    2025年3月5日 编程技术
    200
  • .NET Core中遇到的一些坑的图文详解

     最近.net core升级到2.0后开始慢慢捣鼓的多了起来,但遇到了不少坑,所以特来记录下。 第一个坑  条件编译符   我们在编写一些方法的时候通常会为Debug模式增加一些输出日志等以便我们检查,也会为Release模式增加或修改一些…

    2025年3月5日 编程技术
    200
  • C#实现表格隔行换色

    这篇文章主要介绍了c# 根据表格偶数、奇数加载不同颜色,需要的朋友可以参考下 效果图:         //偶数随机  Random evenRanm = new Random();  //奇数随机  Random oddRanm = ne…

    2025年3月5日
    200
  • C#中的抽象类与接口的详解

    问题出现: 我们在使用C#的抽象类和接口的时候,往往会遇到以下类似的问题,大致归纳如下: (1)抽象类和接口有什么本质的区别和联系? (2)什么时候选择使用抽象类,然啥时候使用接口最恰当呢? (3)在项目中怎样使用才能使得项目更具有可维护性…

    编程技术 2025年3月5日
    200
  • C#中关于程序功能实现以及对代码选择的思考

          接触c#语言只有短短几天时间,想要写出什么高大上的深入性研究文章,估计也是满篇的猜想和一些没有逻辑的推断。截至目前而言,从语言入门知识(大多数程序员的入门仪式——输出“hello,world!”)、数据和数据类型、数据运算、程序…

    2025年3月5日
    200
  • .NET支付宝App支付接入的实例分析

    一、前言        最近也是为了新产品忙得起飞,博客都更新的慢了。新产品为了方便用户支付,需要支付宝扫码接入。这活落到了我的身上。产品是Windows系统下的桌面软件,通过软件生成二维码支付。界面以原生的MVVM编写,下面叙述一下基本的…

    编程技术 2025年3月5日
    200
  • C#单例模式的实现以及性能对比的实例

    这篇文章主要介绍了浅谈c#单例模式的实现和性能对比的相关资料,详细的介绍了6种实现方式,需要的朋友可以参考下 简介 单例指的是只能存在一个实例的类(在C#中,更准确的说法是在每个AppDomain之中只能存在一个实例的类,它是软件工程中使用…

    编程技术 2025年3月5日
    200
  • .Net Core之实现下载文件的实例

    本篇将和大家分享的丝.netcore下载文件,常见的下载有两种:a标签直接指向下载文件地址和post或get请求后台输出文件流的方式,本篇也将围绕这两种来分享;如果对您有好的帮助,请多多支持。 允许站点不识别content-type下载文件…

    2025年3月5日 编程技术
    200
  • C# ArrayListd的长度问题解决

    c# arraylistd的长度问题解决 namespace ArrayListd的长度问题{   class Program   {       static void Main(string[] args)       {       …

    编程技术 2025年3月5日
    200
  • C# 加密类工具实例分析

    5.SHA1加密 //sha1加密    public static String getSha1(String str){        if(str==null||str.length()==0){            return …

    编程技术 2025年3月5日
    200

发表回复

登录后才能评论