需求,一個csv文件中有不少行,每行是個id,字符串,每一個字符串可能兩兩類似(是類似,不是相同),怎樣去重,保留兩兩類似度小於0.8的id。html
作法,用diff庫計算兩兩類似度,每次計算結果,這裏能夠用(進程/線程/協程加速),而後將類似度大於0.8的結果放到set中,最後set中取任意一個加非集合中成員,能夠返回去重結果。python
import sys, os from HTMLParser import HTMLParser from multiprocessing import Queue wait_set=Queue() class HTMLStripper(HTMLParser): def __init__(self): self.reset() self.fed = [] def handle_data(self, d): self.fed.append(d) def get_data(self): return ''.join(self.fed) def strip_tags(html): s = HTMLStripper() s.feed(html.decode('UTF-8')) return s.get_data() def distance(s1, s2): import difflib return difflib.SequenceMatcher(None, s1, s2).ratio() class CSV(): def __init__(self, csvfile, delimiter=',', quotechar='"'): self.csvfile = csvfile self.delimiter = delimiter self.quotechar = quotechar self.items = [] import csv csv.field_size_limit(sys.maxsize)#this line is limited in linux,if run in windows will rasise #csv.field_size_limit(131072) with open(csvfile, 'rU') as f: for item in csv.reader(f, delimiter=delimiter, quotechar=quotechar): self.items.append(item) f.close() def diff(self, threshold=None, strip=None, truncate=None, start=0, end=None): i = start for item in self.items[start:end]: i = i + 1 id = item[0] text = item[1] if strip: text = strip_tags(text) # Skip the items already diffed or itself for c_item in self.items[i:]: c_id = c_item[0] c_text = c_item[1] if strip: c_text = strip_tags(c_text) if truncate: d = distance(text[0:truncate], c_text[0:truncate]) else: d = distance(text, c_text) if (threshold and d < threshold): continue wait_set.put(id)# because the subprocess wait_set.put(c_id) def output(self, data): print ','.join(data) sys.stdout.flush() def analysis(self): print "%s items, %s bytes." % ( len(self.items), os.path.getsize(self.csvfile) ) print '%6s %32s %32s %32s' % ('No', 'UUID', 'Length(after strip)', 'Length(before strip)') i = 0 for item in self.items: print '%6s %32s %32s %32s' % (i, item[0], len(strip_tags(item[1])), len(item[1])) i = i + 1; def wrapper(args): CSV.diff(*args) def main(): csvfile = sys.argv[1] threshold = 0.8 threads = 4 strip = True truncate = 256 action = 'diff' if len(sys.argv) > 2: if sys.argv[2] == '-a': action = 'analysis' else: threshold = float(sys.argv[2]) if len(sys.argv) > 3: threads = int(sys.argv[3]) if len(sys.argv) > 4: strip = bool(int(sys.argv[4])) if len(sys.argv) > 5: truncate = int(sys.argv[5]) c = CSV(csvfile) if action == 'analysis': c.analysis() elif action == 'diff': if threads > 1: batch = len(c.items) / threads tail = len(c.items) % threads from multiprocessing import Pool pool = Pool() args = [] i = 0 while (i < threads): start = batch * i end = start + batch # Last loop if i == (threads - 1): end = end + tail args.append( (c, threshold, strip, truncate, start, end) ) i = i + 1 pool.map(wrapper, args) else: c.diff(threshold, strip, truncate) wait_list=[x[0] for x in c.items[1:]] if wait_set.qsize: queue_list=list(set([wait_set.get() for x in range(wait_set.qsize())])) for x in range(len(queue_list)-1):#left one element ele = queue_list[x] if ele in wait_list: wait_list.remove(ele) print wait_list else: return wait_list sys.stdout.flush() if __name__ == '__main__': main()