Mercurial > hg > index.fcgi > www > www-1
changeset 40:62464a0034d1
add threaded url opener
author | paulo |
---|---|
date | Thu, 31 Jan 2013 02:19:39 -0800 |
parents | 915032dd35f4 |
children | 5f9bc02e9caf |
files | myrss/myrss_parser.py |
diffstat | 1 files changed, 78 insertions(+), 21 deletions(-) [+] |
line diff
1.1 --- a/myrss/myrss_parser.py Wed Jan 30 02:32:22 2013 -0800 1.2 +++ b/myrss/myrss_parser.py Thu Jan 31 02:19:39 2013 -0800 1.3 @@ -1,5 +1,9 @@ 1.4 import os 1.5 +import sys 1.6 import re 1.7 +import urllib2 1.8 +import threading 1.9 +import Queue 1.10 1.11 import html 1.12 import xml.etree.ElementTree 1.13 @@ -7,6 +11,7 @@ 1.14 1.15 MAX_ITEMS = 30 1.16 MAX_LINK_Z = 4 1.17 +MAX_THREADS = 20 1.18 1.19 1.20 _PARSE_ROOT_TAG_RE = re.compile(r"(\{(.+)\})?(.+)") 1.21 @@ -72,6 +77,9 @@ 1.22 link_z = 0 1.23 1.24 for feed in docstruct: 1.25 + if feed is None: 1.26 + continue 1.27 + 1.28 (title, link, items) = feed 1.29 1.30 root.h1.a(title, href=link, klass="z%d" % (link_z % MAX_LINK_Z)) 1.31 @@ -88,28 +96,77 @@ 1.32 return unicode(root).encode("utf-8") 1.33 1.34 1.35 +def _process_url(url): 1.36 + ret = None 1.37 + 1.38 + try: 1.39 + print >> sys.stderr, "--> processing %s" % url 1.40 + feed = urllib2.urlopen(url) 1.41 + except urllib2.HTTPError as e: 1.42 + print >> sys.stderr, "--> (%s) %s" % (url, e) 1.43 + return ret 1.44 + 1.45 + elementTree = xml.etree.ElementTree.parse(feed) 1.46 + root = elementTree.getroot() 1.47 + 1.48 + parsed_root_tag = _parse_root_tag(root.tag) 1.49 + 1.50 + if parsed_root_tag == (None, "rss"): 1.51 + version = float(root.get("version", 0.0)) 1.52 + if version >= 2.0: 1.53 + ret = _go_rss(elementTree) 1.54 + else: 1.55 + raise NotImplementedError("Unsupported rss version") 1.56 + elif parsed_root_tag == ("http://www.w3.org/2005/Atom", "feed"): 1.57 + ret = _go_atom(elementTree) 1.58 + else: 1.59 + raise NotImplementedError("Unknown root tag") 1.60 + 1.61 + return ret 1.62 + 1.63 + 1.64 +class WorkerThread(threading.Thread): 1.65 + def __init__(self, *args, **kwargs): 1.66 + self._input_queue = kwargs.pop("input_queue") 1.67 + self._output_queue = kwargs.pop("output_queue") 1.68 + threading.Thread.__init__(self, *args, **kwargs) 1.69 + self.daemon = True 1.70 + 1.71 + def run(self): 1.72 + while True: 1.73 + (idx, url) = self._input_queue.get() 1.74 + docfeed = None 1.75 + try: 1.76 + docfeed = _process_url(url) 1.77 + except Exception as e: 1.78 + print >> sys.stderr, "--> (%s) exception: %s" % (url, e) 1.79 + self._output_queue.put((idx, docfeed)) 1.80 + self._input_queue.task_done() 1.81 + 1.82 + 1.83 if __name__ == "__main__": 1.84 + with open("FEEDS") as feeds_file: 1.85 + feedlines = feeds_file.readlines() 1.86 1.87 - docstruct = [] 1.88 - XMLFILE = "%d.feedtmp" 1.89 - 1.90 - for i in range(31): 1.91 - if os.path.exists(XMLFILE % i): 1.92 - elementTree = xml.etree.ElementTree.parse(XMLFILE % i) 1.93 - root = elementTree.getroot() 1.94 + docstruct = [None]*len(feedlines) 1.95 + iq = Queue.Queue(feedlines) 1.96 + oq = Queue.Queue(feedlines) 1.97 1.98 - if _parse_root_tag(root.tag) == (None, "rss"): 1.99 - version = float(root.get("version", 0.0)) 1.100 - if version >= 2.0: 1.101 - docstruct.append(_go_rss(elementTree)) 1.102 - else: 1.103 - raise NotImplementedError("Unsupported rss version") 1.104 - elif _parse_root_tag(root.tag) == ("http://www.w3.org/2005/Atom", "feed"): 1.105 - docstruct.append(_go_atom(elementTree)) 1.106 - else: 1.107 - raise NotImplementedError("Unknown root tag") 1.108 + for _ in range(MAX_THREADS): 1.109 + WorkerThread(input_queue=iq, output_queue=oq).start() 1.110 1.111 - if len(docstruct) > 0: 1.112 - print _to_html(docstruct) 1.113 - else: 1.114 - raise RuntimeError("Could not produce docstruct") 1.115 + for (i, l) in enumerate(feedlines): 1.116 + if l[0] != '#': 1.117 + l = l.strip() 1.118 + iq.put((i, l)) 1.119 + 1.120 + iq.join() 1.121 + 1.122 + while True: 1.123 + try: 1.124 + (idx, docfeed) = oq.get_nowait() 1.125 + docstruct[idx] = docfeed 1.126 + except Queue.Empty: 1.127 + break 1.128 + 1.129 + print _to_html(docstruct)