python – 使用rest api时选择Azure事件中心的分区
发布时间:2020-12-20 13:52:40 所属栏目:Python 来源:网络整理
导读:我正在尝试使用 python和其他API向Azure Event Hub发送消息 经过一些失败的实验,我找到了工作代码(见下文),但我希望能够选择发送事件的分区. 这是否可以使用其余的API,如果是这样,应该怎么做? #!/user/bin/pythonimport jsonfrom datetime import datetimef
我正在尝试使用
python和其他API向Azure Event Hub发送消息
经过一些失败的实验,我找到了工作代码(见下文),但我希望能够选择发送事件的分区. 这是否可以使用其余的API,如果是这样,应该怎么做? #!/user/bin/python import json from datetime import datetime from multiprocessing import Pool # from azure.servicebus import _service_bus_error_handler from azure.servicebus.servicebusservice import ServiceBusService,ServiceBusSASAuthentication from azure.http import ( HTTPRequest,HTTPError ) from azure.http.httpclient import _HTTPClient EVENT_HUB_HOST = "mysecrethub.servicebus.windows.net" EVENT_HUB_NAME = "secerthub-name" KEYNAME = "senderkey" # needs to be loaded from ENV KEYVALUE = "keyvalue" # needs to be loaded from ENV EXTRA_HEADERS = [] NUM_OF_PARTITIONS = 16 class EventHubClient(object): def __init__(self,host,hubname,keyname,keyvalue): self._host = host self._hub = hubname self._keyname = keyname self._key = keyvalue def sendMessage(self,body,partition=None,additional_headers=None): eventHubHost = self._host httpclient = _HTTPClient(service_instance=self) sasKeyName = self._keyname sasKeyValue = self._key authentication = ServiceBusSASAuthentication(sasKeyName,sasKeyValue) request = HTTPRequest() request.method = "POST" request.host = eventHubHost request.protocol_override = "https" request.path = "/%s/messages?api-version=2014-01" % (self._hub) request.body = body request.headers.append(('Content-Type','application/atom+xml;type=entry;charset=utf-8')) if additional_headers is not None: for item in additional_headers: request.headers.append(item) if partition is not None: value = json.dumps({'PartitionKey': partition}) request.headers.append(('BrokerProperties',value)) authentication.sign_request(request,httpclient) request.headers.append(('Content-Length',str(len(request.body)))) status = 0 try: resp = httpclient.perform_request(request) status = resp.status except HTTPError as ex: status = ex.status # print request.headers return status def prepare_message(appid,sessionid,partitionKey=None,SessionEllapsed=None,DeviceOs=None): message = {"Name": "MonitorEvent"} Attributes = {"AppId": appid,"SessionStarted": "".join(str(datetime.now())[:-3])} if SessionEllapsed is not None: Attributes['SessionEllapsed'] = SessionEllapsed if DeviceOs is not None: Attributes['DeviceOs'] = DeviceOs if partitionKey is not None: message["PartitionKey"] = str(partitionKey) message["PartitionId"] = str(partitionKey) Attributes['ItemId'] = partitionKey message['Attributes'] = Attributes return json.dumps(message) def send_monitoring_event(partition): hubClient = EventHubClient(EVENT_HUB_HOST,EVENT_HUB_NAME,KEYNAME,KEYVALUE) appid = 1 sendertime = datetime.now().strftime('%Y%M%d-%H%M%S') message = prepare_message(appid,sendertime,partitionKey=partition,SessionEllapsed=1,DeviceOs='Monitor' + str(partition)) # print message hubStatus = hubClient.sendMessage(message,additional_headers=EXTRA_HEADERS) # return the HTTP status to the caller return hubStatus def main(): pool = Pool(processes=NUM_OF_PARTITIONS) print pool.map(send_monitoring_event,range(NUM_OF_PARTITIONS)) if __name__ == '__main__': main() 解决方法
在Event Hubs REST API docunment
https://msdn.microsoft.com/en-us/library/azure/dn790664.aspx的“发送事件”部分之后,您无法使用请求URI https:// {serviceNamespace} .servicebus.windows.net / {eventHubPath} / messages来选择要发送事件的分区.
您应该使用请求URI https:// {serviceNamespace} .servicebus.windows.net / {eventHubPath} / publishers / {deviceId} / messages.属性{deviceId}是用于分组/分区设备的分区键 – 无论是地理位置,设备类型,版本,租户等. 但是分区计数必须是介于2和32之间的数字.因此,如果您需要使用超过32个分区,我建议将密钥放入事件数据中. 最好的祝福. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |