Python对接支付宝支付自实现
发布时间:2020-12-20 10:21:50 所属栏目:Python 来源:网络整理
导读:Python对接支付宝支付自实现 # -*- coding: utf-8 -*-import base64import jsonimport urllib.parsefrom datetime import datetimeimport requestsfrom cryptography.hazmat.backends import default_backendfrom cryptography.hazmat.primitives import has
Python对接支付宝支付自实现# -*- coding: utf-8 -*- import base64 import json import urllib.parse from datetime import datetime import requests from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import padding class AliPayException(Exception): def __init__(self,data): super(AliPayException,self).__init__() self.data = data def __str__(self): return "alipay - {}".format(self.data) def __unicode__(self): return u"alipay - {}".format(self.data) class AliPayVerifyException(AliPayException): def __init__(self,msg,data): super(AliPayVerifyException,self).__init__('alipay verify except - {}:{}'.format(msg,data)) class AliPay: def __init__(self,**kwargs): """ :param kwargs: url: 请求地址 notify_url: 支付宝服务器主动通知商户服务器里指定的页面http/https路径 app_id: 支付宝分配给开发者的应用ID sign_type: 商户生成签名字符串所使用的签名算法类型,目前支持RSA2和RSA,推荐使用RSA2 app_private_key: 签名私钥 """ self._app_id = kwargs['app_id'] self._seller_id = kwargs['seller_id'] self._gateway_url = kwargs['gateway_url'] self._notify_url = kwargs.get('notify_url') self._sign_type = kwargs.get('sign_type','RSA2') if self._sign_type not in ('RSA','RSA2'): raise Exception('alipay sign_type must `RSA` or `RSA2`') self._charset = 'utf-8' self._format = 'json' with open(kwargs['app_private_key']) as f: self._app_private_key = serialization.load_pem_private_key( f.read().encode('utf8'),None,default_backend() ) with open(kwargs['public_key']) as f: self._public_key = serialization.load_pem_public_key( f.read().encode('utf8'),default_backend() ) @property def app_id(self): return self._app_id @property def seller_id(self): return self._seller_id def app_private_sign(self,data): if self._sign_type == 'RSA': signature = self._app_private_key.sign( data.encode('utf8'),padding.PKCS1v15(),hashes.SHA1()) else: signature = self._app_private_key.sign( data.encode('utf8'),hashes.SHA256()) return base64.b64encode(signature).decode('utf8') def sync_verify(self,method,raw_data): """ 同步验签 :return: """ method = method.replace('.','_') + '_response' raw_data = raw_data.decode('utf8') sign_index = raw_data.rfind('sign') signature = base64.b64decode(raw_data[sign_index + 7: -2]) method_data = raw_data[raw_data.find(method) + len(method) + 2: sign_index - 2] self._public_key.verify( signature,method_data.encode('utf8'),hashes.SHA256()) def async_verify(self,data): """ 异步验签 :return: """ sign_data = {} for k,v in data.items(): if k in ('sign','sign_type'): continue sign_data[k] = v self._public_key.verify( base64.b64decode(data['sign']),'&'.join(['{}={}'.format(item) for item in self.sort_data(sign_data)]).encode('utf8'),hashes.SHA256()) @staticmethod def sort_data(data): return [(k,data[k]) for k in sorted(data.keys())] def params(self,biz_content): data = { 'app_id': self._app_id,'method': method,'format': self._format,'charset': self._charset,'sign_type': self._sign_type,'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),# yyyy-MM-dd HH:mm:ss 'version': '1.0','biz_content': json.dumps(biz_content,separators=(',',':')) } if self._notify_url: data['notify_url'] = self._notify_url sign = self.app_private_sign('&'.join(['{}={}'.format(item) for item in self.sort_data(data)])) p = '&'.join(['{}={}'.format(item[0],urllib.parse.quote(item[1])) for item in self.sort_data(data)]) p += '&{}={}}'.format('sign',urllib.parse.quote(sign)) return p def command(self,biz_content): params = self.params(method,biz_content) response = requests.get('%s?%s' % (self._gateway_url,params)) response_raw_data = response.content response_data = response.json() alipay_response_data = response_data[method.replace('.','_') + '_response'] if alipay_response_data.get('code','10000') != '10000': raise AliPayException(alipay_response_data) self.sync_verify(method,response_raw_data) return alipay_response_data if __name__ == '__main__': alipay = AliPay(**{ 'app_id': '...','seller_id': '...','gateway_url': 'https://openapi.alipaydev.com/gateway.do','notify_url': '...','app_private_key': 'path to private_key.pem','public_key': 'path to public_key.pem' }) biz_content = { 'out_trade_no': "111",'total_amount': 0.01,'subject': "test",} alipay.command('alipay.trade.precreate',biz_content) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |