加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > asp.Net > 正文

Azure Event Hub 技术研究系列3-Event Hub接收事件

发布时间:2020-12-16 08:51:15 所属栏目:asp.Net 来源:网络整理
导读:上篇博文中,我们通过编程的方式介绍了如何将事件消息发送到Azure Event Hub: Azure Event Hub 技术研究系列2-发送事件到Event Hub 本篇文章中,我们继续:从Event Hub中接收事件。 1. 新建控制台工程 EventHubReceiver 2. 添加Nuget引用 Microsoft.Azure.E

上篇博文中,我们通过编程的方式介绍了如何将事件消息发送到Azure Event Hub:

Azure Event Hub 技术研究系列2-发送事件到Event Hub

本篇文章中,我们继续:从Event Hub中接收事件。

1. 新建控制台工程 EventHubReceiver

2. 添加Nuget引用

Microsoft.Azure.EventHubs

Microsoft.Azure.EventHubs.Processor

3. 实现IEventProcessor接口

MyEventProcessor

 1     using Microsoft.Azure.EventHubs;
 2      Microsoft.Azure.EventHubs.Processor;
 3      System.Threading.Tasks;
 4 
 5     public class MyEventProcessor : IEventProcessor
 6     {
 7         public Task CloseAsync(PartitionContext context,CloseReason reason)
 8         {
 9             Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}',Reason: '{reason}'.");
10             return Task.CompletedTask;
11         }
12 
13          Task OpenAsync(PartitionContext context)
14 15             Console.WriteLine($MyEventProcessor initialized. Partition: '{context.PartitionId}'16             17 18 
19          Task ProcessErrorAsync(PartitionContext context,Exception error)
20 21             Console.WriteLine($Error on Partition: {context.PartitionId},Error: {error.Message}22             23 24 
25         public Task ProcessEventsAsync(PartitionContext context,IEnumerable<EventData> messages)
26 27             foreach (var eventData in28             {
29                 var data = Encoding.UTF8.GetString(eventData.Body.Array,eventData.Body.Offset,eventData.Body.Count);
30                 Console.WriteLine($Event message received. Partition: '{context.PartitionId}',Data: '{data}'31             }
32 
33              context.CheckpointAsync();
34 35     }

4. Program程序

添加常量作为事件中心连接字符串、事件中心名称、存储帐户容器名称、存储帐户名称和存储帐户密钥。 添加以下代码,并将占位符替换为其对应的值。

        private const string EhConnectionString = {Event Hubs connection string};
        string EhEntityPath = {Event Hub path/name}"; //MyEventHub
        string StorageContainerName = {Storage account container name}eventhubcontainer
        string StorageAccountName = {Storage account name}linux1
        string StorageAccountKey = {Storage account key}";
private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}",StorageAccountName,StorageAccountKey);

这里涉及到Azure Storage Account,必须为上篇博文中创建的事件中心MyEventHub指定一个存储账户和存储容器

增加MainAysnc方法:注册事件处理器,处理事件消息

 1         /// <summary>
 2         /// 注册事件处理器
 3         </summary>
 4         <param name="args"></param>
 5         <returns></returns>
 6         static async Task MainAsync(string[] args)
 7  8             Console.WriteLine(Registering EventProcessor... 9 
var eventProcessorHost = new EventProcessorHost(
                EhEntityPath,12                 PartitionReceiver.DefaultConsumerGroupName,1)">13                 EhConnectionString,1)">                StorageConnectionString,1)">15                 StorageContainerName);
16 
17              Registers the Event Processor Host and starts receiving messages
18             await eventProcessorHost.RegisterEventProcessorAsync<MyEventProcessor>();
19 
20             Console.WriteLine(Receiving. Press ENTER to stop worker.21             Console.ReadLine();
22 
23              Disposes of the Event Processor Host
24             await eventProcessorHost.UnregisterEventProcessorAsync();
25         }

Main函数

1         void Main(2 3             MainAsync(args).GetAwaiter().GetResult();
4         }

Run

?

至此,我们实现了事件消息发送到Event Hub,同时从Event Hub接收处理事件消息。

?

周国庆

2017/5/18

?

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读