python – PySpark马尔可夫模型的算法/编码帮助
我需要一些帮助让我的大脑围绕设计一个(高效)markov链在spark(通过
python).我尽可能地写了它,但是我提出的代码没有扩展.基本上对于各种地图阶段,我编写了自定义函数,它们可以很好地处理几千个序列,但是当我们得到时在20,000(并且我有一些高达800k)的东西慢慢爬行.
对于那些不熟悉马尔科夫模型的人来说,这就是它的要点. 这是我的数据..此时我在RDD中得到了实际数据(没有标题). ID,SEQ 500,HNL,LNH,MLH,HML 我们看一下元组中的序列,所以 (HNL,LNH),(LNH,MLH),etc.. 我需要达到这一点..在那里我返回一个字典(对于每一行数据),然后我将其序列化并存储在内存数据库中. {500: {HNLLNH : 0.333},{LNHMLH : 0.333},{MLHHML : 0.333},{LNHHNL : 0.000},etc.. } 所以本质上,每个序列与下一个序列组合(HNL,LNH变成’HNLLNH’),然后对于所有可能的转换(序列组合),我们计算它们的出现次数,然后除以转换的总数(在这种情况下为3)并获得他们的发生频率. 上面有3个转换,其中一个是HNLLNH ..所以对于HNLLNH,1/3 = 0.333 作为一方不是,我不确定它是否相关,但是序列中每个位置的值是有限的.第一个位置(H / M / L),第二个位置(M / L),第三个位置(H,M,L). 我的代码以前做的是收集()rdd,并使用我编写的函数映射它几次.这些函数首先将字符串转换为列表,然后将列表[1]与列表[2]合并,然后列表[2]列表[3],然后列出[3]列表[4]等等.所以我结束了这样的事情…… [HNLLNH],[LNHMLH],[MHLHML],etc.. 然后,下一个函数使用列表项作为键创建该列表中的字典,然后计算完整列表中该键的总发生率,除以len(列表)以获得频率.然后我将该字典包含在另一个字典中,连同它的ID号(导致第二个代码块,上面一个). 就像我说的,这适用于小型序列,但对于长度为100k的列表则不太好. 另外,请记住,这只是一行数据.我必须在10-20k行数据的任何地方执行此操作,每行的长度为500-800,000个序列. 关于如何编写pyspark代码(使用API?? map / reduce / agg / etc ..函数)有效地执行此操作的任何建议? 编辑 def f(x): # Custom RDD map function # Combines two separate transactions # into a single transition state cust_id = x[0] trans = ','.join(x[1]) y = trans.split(",") s = '' for i in range(len(y)-1): s= s + str(y[i] + str(y[i+1]))+"," return str(cust_id+','+s[:-1]) def g(x): # Custom RDD map function # Calculates the transition state probabilities # by adding up state-transition occurrences # and dividing by total transitions cust_id=str(x.split(",")[0]) trans = x.split(",")[1:] temp_list=[] middle = int((len(trans[0])+1)/2) for i in trans: temp_list.append( (''.join(i)[:middle],''.join(i)[middle:]) ) state_trans = {} for i in temp_list: state_trans[i] = temp_list.count(i)/(len(temp_list)) my_dict = {} my_dict[cust_id]=state_trans return my_dict def gen_tsm_dict_spark(lines): # Takes RDD/string input with format CUST_ID(or)PROFILE_ID,SEQ,SEQ.... # Returns RDD of dict with CUST_ID and tsm per customer # i.e. {cust_id : { ('NLN','LNN') : 0.33,('HPN','NPN') : 0.66} # creates a tuple ([cust/profile_id],[SEQ,SEQ]) cust_trans = lines.map(lambda s: (s.split(",")[0],s.split(",")[1:])) with_seq = cust_trans.map(f) full_tsm_dict = with_seq.map(g) return full_tsm_dict def main(): result = gen_tsm_spark(my_rdd) # Insert into DB for x in result.collect(): for k,v in x.iteritems(): db_insert(k,v) 解决方法
你可以试试下面的东西.它在很大程度上取决于
tooolz ,但如果您希望避免外部依赖,则可以使用一些标准Python库轻松替换它.
from __future__ import division from collections import Counter from itertools import product from toolz.curried import sliding_window,map,pipe,concat from toolz.dicttoolz import merge # Generate all possible transitions defaults = sc.broadcast(dict(map( lambda x: ("".join(concat(x)),0.0),product(product("HNL","NL","HNL"),repeat=2)))) rdd = sc.parallelize(["500,NLH,HNL","600,HNN,NNN,LNH"]) def process(line): """ >>> process("000,HHH,LLL,NNN") ('000',{'LLLNNN': 0.5,'HHHLLL': 0.5}) """ bits = line.split(",") transactions = bits[1:] n = len(transactions) - 1 frequencies = pipe( sliding_window(2,transactions),# Get all transitions map(lambda p: "".join(p)),# Joins strings Counter,# Count lambda cnt: {k: v / n for (k,v) in cnt.items()} # Get frequencies ) return bits[0],frequencies def store_partition(iter): for (k,v) in iter: db_insert(k,merge([defaults.value,v])) rdd.map(process).foreachPartition(store_partition) 由于您知道所有可能的转换,我建议使用稀疏表示并忽略零.此外,您可以使用稀疏向量替换字典以减少内存占用. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |