# HG changeset patch # User paulo # Date 1359627579 28800 # Node ID 62464a0034d148907fc74fe57c4ecad864aa881d # Parent 915032dd35f4b27f9e4759118ac93b1c941161ce add threaded url opener diff -r 915032dd35f4 -r 62464a0034d1 myrss/myrss_parser.py --- a/myrss/myrss_parser.py Wed Jan 30 02:32:22 2013 -0800 +++ b/myrss/myrss_parser.py Thu Jan 31 02:19:39 2013 -0800 @@ -1,5 +1,9 @@ import os +import sys import re +import urllib2 +import threading +import Queue import html import xml.etree.ElementTree @@ -7,6 +11,7 @@ MAX_ITEMS = 30 MAX_LINK_Z = 4 +MAX_THREADS = 20 _PARSE_ROOT_TAG_RE = re.compile(r"(\{(.+)\})?(.+)") @@ -72,6 +77,9 @@ link_z = 0 for feed in docstruct: + if feed is None: + continue + (title, link, items) = feed root.h1.a(title, href=link, klass="z%d" % (link_z % MAX_LINK_Z)) @@ -88,28 +96,77 @@ return unicode(root).encode("utf-8") +def _process_url(url): + ret = None + + try: + print >> sys.stderr, "--> processing %s" % url + feed = urllib2.urlopen(url) + except urllib2.HTTPError as e: + print >> sys.stderr, "--> (%s) %s" % (url, e) + return ret + + elementTree = xml.etree.ElementTree.parse(feed) + root = elementTree.getroot() + + parsed_root_tag = _parse_root_tag(root.tag) + + if parsed_root_tag == (None, "rss"): + version = float(root.get("version", 0.0)) + if version >= 2.0: + ret = _go_rss(elementTree) + else: + raise NotImplementedError("Unsupported rss version") + elif parsed_root_tag == ("http://www.w3.org/2005/Atom", "feed"): + ret = _go_atom(elementTree) + else: + raise NotImplementedError("Unknown root tag") + + return ret + + +class WorkerThread(threading.Thread): + def __init__(self, *args, **kwargs): + self._input_queue = kwargs.pop("input_queue") + self._output_queue = kwargs.pop("output_queue") + threading.Thread.__init__(self, *args, **kwargs) + self.daemon = True + + def run(self): + while True: + (idx, url) = self._input_queue.get() + docfeed = None + try: + docfeed = _process_url(url) + except Exception as e: + print >> sys.stderr, "--> (%s) exception: %s" % (url, e) + self._output_queue.put((idx, docfeed)) + self._input_queue.task_done() + + if __name__ == "__main__": + with open("FEEDS") as feeds_file: + feedlines = feeds_file.readlines() - docstruct = [] - XMLFILE = "%d.feedtmp" - - for i in range(31): - if os.path.exists(XMLFILE % i): - elementTree = xml.etree.ElementTree.parse(XMLFILE % i) - root = elementTree.getroot() + docstruct = [None]*len(feedlines) + iq = Queue.Queue(feedlines) + oq = Queue.Queue(feedlines) - if _parse_root_tag(root.tag) == (None, "rss"): - version = float(root.get("version", 0.0)) - if version >= 2.0: - docstruct.append(_go_rss(elementTree)) - else: - raise NotImplementedError("Unsupported rss version") - elif _parse_root_tag(root.tag) == ("http://www.w3.org/2005/Atom", "feed"): - docstruct.append(_go_atom(elementTree)) - else: - raise NotImplementedError("Unknown root tag") + for _ in range(MAX_THREADS): + WorkerThread(input_queue=iq, output_queue=oq).start() - if len(docstruct) > 0: - print _to_html(docstruct) - else: - raise RuntimeError("Could not produce docstruct") + for (i, l) in enumerate(feedlines): + if l[0] != '#': + l = l.strip() + iq.put((i, l)) + + iq.join() + + while True: + try: + (idx, docfeed) = oq.get_nowait() + docstruct[idx] = docfeed + except Queue.Empty: + break + + print _to_html(docstruct)