paulo@110: import datetime paulo@110: import gzip paulo@108: import io paulo@108: import os paulo@110: import queue paulo@110: import re paulo@108: import sys paulo@108: import threading paulo@108: import time paulo@108: import traceback paulo@110: import urllib.error paulo@110: import urllib.request paulo@108: paulo@108: import logging paulo@108: LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO") paulo@108: logging.basicConfig( paulo@108: level=getattr(logging, LOG_LEVEL), paulo@108: format="%(asctime)s %(levelname)-8s %(message)s", paulo@108: ) paulo@108: paulo@108: import xml.etree.ElementTree paulo@108: import html paulo@108: paulo@108: from html3.html3 import HTML paulo@108: paulo@108: paulo@108: FEEDS_FILE = "FEEDS" paulo@108: CACHE_HTML_FILE = "__cache__.html" paulo@108: paulo@108: CACHE_LIFE = 1200 # [seconds] paulo@108: MAX_ITEMS = 50 paulo@108: MAX_LINK_Z = 4 paulo@108: MAX_THREADS = 20 paulo@108: URLOPEN_TIMEOUT = 10 # [seconds] paulo@108: paulo@108: paulo@108: _PARSE_ROOT_TAG_RE = re.compile(r"(\{(.+)\})?(.+)") paulo@108: paulo@108: def _parse_root_tag(root_tag): paulo@108: re_match = _PARSE_ROOT_TAG_RE.match(root_tag) paulo@108: paulo@108: if re_match is None: paulo@108: return (None, None) paulo@108: else: paulo@108: return re_match.group(2, 3) paulo@108: paulo@108: paulo@108: def _strip_if_not_none(txt): paulo@108: return txt.strip() if txt is not None else '' paulo@108: paulo@108: paulo@108: def _go_rss(elementTree): paulo@108: title = _strip_if_not_none(elementTree.find("channel/title").text) paulo@108: link = elementTree.find("channel/link").text paulo@108: paulo@108: items = [] paulo@108: paulo@108: for i in elementTree.findall("channel/item")[:MAX_ITEMS]: paulo@108: it_title = _strip_if_not_none(i.find("title").text) paulo@108: it_link = i.find("link").text paulo@108: paulo@108: items.append((it_title, it_link)) paulo@108: paulo@108: return (title, link, items) paulo@108: paulo@108: paulo@108: def _go_atom(elementTree): paulo@108: ns = "http://www.w3.org/2005/Atom" paulo@108: paulo@108: title = _strip_if_not_none(elementTree.find("{%s}title" % ns).text) paulo@108: link = '' paulo@108: paulo@108: links = elementTree.findall("{%s}link" % ns) paulo@108: for i in links: paulo@108: if len(links) == 1 or i.get("rel") == "alternate": paulo@108: link = i.get("href") paulo@108: break paulo@108: paulo@108: items = [] paulo@108: paulo@108: for i in elementTree.findall("{%s}entry" % ns)[:MAX_ITEMS]: paulo@108: it_title = _strip_if_not_none(i.find("{%s}title" % ns).text) paulo@108: it_link = '' paulo@108: paulo@108: it_links = i.findall("{%s}link" % ns) paulo@108: for j in it_links: paulo@108: if len(it_links) == 1 or j.get("rel") == "alternate": paulo@108: it_link = j.get("href") paulo@108: break paulo@108: paulo@108: items.append((it_title, it_link)) paulo@108: paulo@108: return (title, link, items) paulo@108: paulo@108: paulo@108: def _go_purl_rss(elementTree): paulo@108: ns = "http://purl.org/rss/1.0/" paulo@108: paulo@108: title = _strip_if_not_none(elementTree.find("{%s}channel/{%s}title" % (ns, ns)).text) paulo@108: link = elementTree.find("{%s}channel/{%s}link" % (ns, ns)).text paulo@108: paulo@108: items = [] paulo@108: paulo@108: for i in elementTree.findall("{%s}item" % ns)[:MAX_ITEMS]: paulo@108: it_title = _strip_if_not_none(i.find("{%s}title" % ns).text) paulo@108: it_link = i.find("{%s}link" % ns).text paulo@108: paulo@108: items.append((it_title, it_link)) paulo@108: paulo@108: return (title, link, items) paulo@108: paulo@108: paulo@108: _STRIP_HTML_RE = re.compile(r"<.*?>") paulo@108: paulo@108: def _strip_html(txt): paulo@108: return html.unescape(_STRIP_HTML_RE.sub('', txt)) paulo@108: paulo@108: paulo@108: def _to_html(dtnow, docstruct): paulo@108: datetime_str = dtnow.strftime("%Y-%m-%d %H:%M %Z") paulo@108: page_title = "myrss -- %s" % datetime_str paulo@108: paulo@108: root = HTML("html") paulo@108: paulo@108: header = root.head paulo@108: header.meta(name="viewport", content="width=device-width, initial-scale=1") paulo@108: header.title(page_title) paulo@108: header.link(rel="stylesheet", type="text/css", href="static/index.css") paulo@108: paulo@108: body = root.body paulo@108: body.h1(page_title) paulo@108: paulo@108: link_z = 0 paulo@108: paulo@108: for feed in docstruct: paulo@108: if feed is None: paulo@108: continue paulo@108: paulo@108: (title, link, items) = feed paulo@108: paulo@108: logging.debug("title: %s", title) paulo@108: body.h2.a(_strip_html(title), href=link, klass="z%d" % (link_z % MAX_LINK_Z)) paulo@108: link_z += 1 paulo@108: p = body.p paulo@108: paulo@108: for (i, (it_title, it_link)) in enumerate(items): paulo@108: if i > 0: paulo@108: p += " - " paulo@108: paulo@108: if not it_title: paulo@108: it_title = "(missing title)" paulo@108: if it_link is not None: paulo@108: p.a(_strip_html(it_title), href=it_link, klass="z%d" % (link_z % MAX_LINK_Z)) paulo@108: else: paulo@108: p += _strip_html(it_title) paulo@108: paulo@108: link_z += 1 paulo@108: paulo@108: dtdelta = datetime.datetime.now() - dtnow paulo@108: root.div("%.3f" % (dtdelta.days*86400 + dtdelta.seconds + dtdelta.microseconds/1e6), klass="debug") paulo@108: paulo@108: return str(root) paulo@108: paulo@108: paulo@108: def _fetch_url(url): paulo@108: try: paulo@108: logging.info("processing %s" % url) paulo@108: feed = urllib.request.urlopen(urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0 Browser"}), timeout=URLOPEN_TIMEOUT) paulo@110: response_headers = feed.info().as_string().splitlines() paulo@110: if 'Content-Encoding: gzip' in response_headers: paulo@110: body = gzip.decompress(feed.read()) paulo@110: else: paulo@110: body = feed.read() paulo@110: paulo@108: except urllib.error.HTTPError as e: paulo@108: logging.info("(%s) %s" % (url, e)) paulo@108: return None paulo@108: paulo@110: return str(body, encoding="utf-8") paulo@108: paulo@108: paulo@108: def _filter_feed(feed): paulo@110: ret = feed.strip() paulo@108: paulo@108: filter_out = ["\x16"] paulo@108: for i in filter_out: paulo@108: ret = ret.replace(i, "") paulo@108: paulo@108: return ret paulo@108: paulo@108: paulo@108: def _process_feed(feed): paulo@108: ret = None paulo@108: paulo@108: feed_sio = io.StringIO(feed) paulo@108: elementTree = xml.etree.ElementTree.parse(feed_sio) paulo@108: root = elementTree.getroot() paulo@108: paulo@108: parsed_root_tag = _parse_root_tag(root.tag) paulo@108: paulo@108: if parsed_root_tag == (None, "rss"): paulo@108: version = float(root.get("version", 0.0)) paulo@108: if version >= 2.0: paulo@108: ret = _go_rss(elementTree) paulo@108: else: paulo@108: raise NotImplementedError("Unsupported rss version") paulo@108: elif parsed_root_tag == ("http://www.w3.org/2005/Atom", "feed"): paulo@108: ret = _go_atom(elementTree) paulo@108: elif parsed_root_tag == ("http://www.w3.org/1999/02/22-rdf-syntax-ns#", "RDF"): paulo@108: ret = _go_purl_rss(elementTree) paulo@108: else: paulo@108: raise NotImplementedError("Unknown root tag") paulo@108: paulo@108: return ret paulo@108: paulo@108: paulo@108: class WorkerThread(threading.Thread): paulo@108: def __init__(self, *args, **kwargs): paulo@108: self._input_queue = kwargs.pop("input_queue") paulo@108: self._output_queue = kwargs.pop("output_queue") paulo@108: threading.Thread.__init__(self, *args, **kwargs) paulo@108: self.daemon = True paulo@108: paulo@108: def run(self): paulo@108: while True: paulo@108: (idx, url) = self._input_queue.get() paulo@108: docfeed = None paulo@108: try: paulo@108: feed = _fetch_url(url) paulo@108: if feed is not None: paulo@108: docfeed = _process_feed(_filter_feed(feed)) paulo@108: except Exception as e: paulo@108: logging.info("(%s) exception: (%s) %s" % (url, type(e), e)) paulo@108: self._output_queue.put((idx, docfeed)) paulo@108: paulo@108: paulo@108: def main(input_queue, output_queue, lock): paulo@108: ret = '' paulo@108: paulo@108: with lock: paulo@108: logging.debug("main() started") paulo@108: epoch_now = time.time() paulo@108: dtnow = datetime.datetime.fromtimestamp(epoch_now) paulo@108: paulo@108: if os.path.exists(CACHE_HTML_FILE) and (epoch_now - os.stat(CACHE_HTML_FILE).st_mtime) < float(CACHE_LIFE): paulo@108: with open(CACHE_HTML_FILE) as cache_html_file: paulo@108: ret = cache_html_file.read() paulo@108: paulo@108: else: paulo@108: with open(FEEDS_FILE) as feeds_file: paulo@108: feedlines = feeds_file.readlines() paulo@108: paulo@108: docstruct = [None]*len(feedlines) paulo@108: num_input = 0 paulo@108: for (i, l) in enumerate(feedlines): paulo@108: if l[0] != '#': paulo@108: l = l.strip() paulo@108: input_queue.put((i, l)) paulo@108: num_input += 1 paulo@108: paulo@108: for _ in range(num_input): paulo@108: (idx, docfeed) = output_queue.get() paulo@108: docstruct[idx] = docfeed paulo@108: paulo@108: ret = _to_html(dtnow, docstruct) paulo@108: paulo@108: with open(CACHE_HTML_FILE, 'w') as cache_html_file: paulo@108: cache_html_file.write(ret) paulo@108: logging.debug("main() ended") paulo@108: paulo@108: return ret paulo@108: paulo@108: paulo@108: class MyRssApp: paulo@108: def __init__(self): paulo@108: logging.debug("MyRssApp.__init__() called") paulo@108: self._iq = queue.Queue(MAX_THREADS) paulo@108: self._oq = queue.Queue(MAX_THREADS) paulo@108: self._main_lock = threading.Lock() paulo@108: paulo@108: for i in range(MAX_THREADS): paulo@108: logging.debug("Starting thread: %d" % i) paulo@108: WorkerThread(input_queue=self._iq, output_queue=self._oq).start() paulo@108: paulo@110: # Raw WSGI paulo@108: def __call__(self, environ, start_response): paulo@108: response_code = "500 Internal Server Error" paulo@108: response_type = "text/plain; charset=UTF-8" paulo@108: paulo@108: try: paulo@108: response_body = main(self._iq, self._oq, self._main_lock) paulo@108: response_code = "200 OK" paulo@108: response_type = "text/html; charset=UTF-8" paulo@108: except: paulo@108: response_body = traceback.format_exc() paulo@108: paulo@108: response_headers = [ paulo@108: ("Content-Type", response_type), paulo@108: ("Content-Length", str(len(response_body))), paulo@108: ] paulo@108: start_response(response_code, response_headers) paulo@108: paulo@108: return [bytes(response_body, encoding="utf-8")] paulo@108: paulo@108: def call(self): paulo@108: return main(self._iq, self._oq, self._main_lock)