changeset 40:62464a0034d1

add threaded url opener
author paulo
date Thu, 31 Jan 2013 02:19:39 -0800
parents 915032dd35f4
children 5f9bc02e9caf
files myrss/myrss_parser.py
diffstat 1 files changed, 78 insertions(+), 21 deletions(-) [+]
line diff
     1.1 --- a/myrss/myrss_parser.py	Wed Jan 30 02:32:22 2013 -0800
     1.2 +++ b/myrss/myrss_parser.py	Thu Jan 31 02:19:39 2013 -0800
     1.3 @@ -1,5 +1,9 @@
     1.4  import os
     1.5 +import sys
     1.6  import re
     1.7 +import urllib2
     1.8 +import threading
     1.9 +import Queue
    1.10  
    1.11  import html
    1.12  import xml.etree.ElementTree 
    1.13 @@ -7,6 +11,7 @@
    1.14  
    1.15  MAX_ITEMS = 30
    1.16  MAX_LINK_Z = 4
    1.17 +MAX_THREADS = 20
    1.18  
    1.19  
    1.20  _PARSE_ROOT_TAG_RE = re.compile(r"(\{(.+)\})?(.+)")
    1.21 @@ -72,6 +77,9 @@
    1.22  	link_z = 0
    1.23  
    1.24  	for feed in docstruct:
    1.25 +		if feed is None:
    1.26 +			continue
    1.27 +
    1.28  		(title, link, items) = feed
    1.29  
    1.30  		root.h1.a(title, href=link, klass="z%d" % (link_z % MAX_LINK_Z))
    1.31 @@ -88,28 +96,77 @@
    1.32  	return unicode(root).encode("utf-8")
    1.33  
    1.34  
    1.35 +def _process_url(url):
    1.36 +	ret = None
    1.37 +
    1.38 +	try:
    1.39 +		print >> sys.stderr, "--> processing %s" % url
    1.40 +		feed = urllib2.urlopen(url)
    1.41 +	except urllib2.HTTPError as e:
    1.42 +		print >> sys.stderr, "--> (%s) %s" % (url, e)
    1.43 +		return ret
    1.44 +
    1.45 +	elementTree = xml.etree.ElementTree.parse(feed)
    1.46 +	root = elementTree.getroot()
    1.47 +
    1.48 +	parsed_root_tag = _parse_root_tag(root.tag) 
    1.49 +
    1.50 +	if parsed_root_tag == (None, "rss"):
    1.51 +		version = float(root.get("version", 0.0))
    1.52 +		if version >= 2.0:
    1.53 +			ret = _go_rss(elementTree)
    1.54 +		else:
    1.55 +			raise NotImplementedError("Unsupported rss version")
    1.56 +	elif parsed_root_tag == ("http://www.w3.org/2005/Atom", "feed"):
    1.57 +		ret = _go_atom(elementTree)
    1.58 +	else:
    1.59 +		raise NotImplementedError("Unknown root tag")
    1.60 +
    1.61 +	return ret
    1.62 +
    1.63 +
    1.64 +class WorkerThread(threading.Thread):
    1.65 +	def __init__(self, *args, **kwargs):
    1.66 +		self._input_queue = kwargs.pop("input_queue")
    1.67 +		self._output_queue = kwargs.pop("output_queue")
    1.68 +		threading.Thread.__init__(self, *args, **kwargs)
    1.69 +		self.daemon = True
    1.70 +
    1.71 +	def run(self):
    1.72 +		while True:
    1.73 +			(idx, url) = self._input_queue.get()
    1.74 +			docfeed = None
    1.75 +			try:
    1.76 +				docfeed = _process_url(url)
    1.77 +			except Exception as e:
    1.78 +				print >> sys.stderr, "--> (%s) exception: %s" % (url, e)
    1.79 +			self._output_queue.put((idx, docfeed))
    1.80 +			self._input_queue.task_done()
    1.81 +			
    1.82 +
    1.83  if __name__ == "__main__":
    1.84 +	with open("FEEDS") as feeds_file:
    1.85 +		feedlines = feeds_file.readlines()
    1.86  
    1.87 -	docstruct = []
    1.88 -	XMLFILE = "%d.feedtmp"
    1.89 -	
    1.90 -	for i in range(31):
    1.91 -		if os.path.exists(XMLFILE % i):
    1.92 -			elementTree = xml.etree.ElementTree.parse(XMLFILE % i)
    1.93 -			root = elementTree.getroot()
    1.94 +	docstruct = [None]*len(feedlines)
    1.95 +	iq = Queue.Queue(feedlines)
    1.96 +	oq = Queue.Queue(feedlines)
    1.97  
    1.98 -			if _parse_root_tag(root.tag) == (None, "rss"):
    1.99 -				version = float(root.get("version", 0.0))
   1.100 -				if version >= 2.0:
   1.101 -					docstruct.append(_go_rss(elementTree))
   1.102 -				else:
   1.103 -					raise NotImplementedError("Unsupported rss version")
   1.104 -			elif _parse_root_tag(root.tag) == ("http://www.w3.org/2005/Atom", "feed"):
   1.105 -				docstruct.append(_go_atom(elementTree))
   1.106 -			else:
   1.107 -				raise NotImplementedError("Unknown root tag")
   1.108 +	for _ in range(MAX_THREADS):
   1.109 +		WorkerThread(input_queue=iq, output_queue=oq).start()
   1.110  
   1.111 -	if len(docstruct) > 0:
   1.112 -		print _to_html(docstruct)
   1.113 -	else:
   1.114 -		raise RuntimeError("Could not produce docstruct")
   1.115 +	for (i, l) in enumerate(feedlines):
   1.116 +		if l[0] != '#':
   1.117 +			l = l.strip()
   1.118 +			iq.put((i, l))
   1.119 +
   1.120 +	iq.join()
   1.121 +
   1.122 +	while True:
   1.123 +		try:
   1.124 +			(idx, docfeed) = oq.get_nowait()
   1.125 +			docstruct[idx] = docfeed
   1.126 +		except Queue.Empty:
   1.127 +			break
   1.128 +
   1.129 +	print _to_html(docstruct)