#!/usr/bin/python3 from bs4 import BeautifulSoup import configparser from datetime import datetime from dateutil.parser import parse import feedparser import hashlib import json import os import requests import re import sys import time import logging from logging.config import fileConfig appconfig = configparser.ConfigParser() appconfig.read('blogs-i-read_v2.ini') blogs_to_read = appconfig['blogsiread']['blogfile'] cronlinks_file = appconfig['blogsiread']['cronlinksfile'] feed_timeout = float(appconfig['blogsiread']['feedtimeout']) fileConfig('logging_config.ini') logger = logging.getLogger("blogs-i-read_v2") if os.environ.get('LOGLEVEL'): logger.setLevel(level=os.environ.get('LOGLEVEL', 'WARNING').upper()) with open(blogs_to_read, 'r') as blogfile: blogs = json.load(blogfile) met_offset = 3600 md5_sums = {} try: with open('md5_sums.json', 'r') as file: md5_sums = json.load(file) except: logger.debug('could not open md5_sums.json') # Dictionary to store the results results = {} def get_feed_content(url): count = 0 feed = '' while count <= 3: count += 1 #if True: try: if count > 1: logger.debug(f"attempt {count} to read from {url}") response = requests.get(url, headers={'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1)'}, timeout=feed_timeout ) response.encoding = 'utf-8' feed = feedparser.parse(response.text) break except: if count == 3: break return feed def examine_feed(url): (md5, post_title, post_url, last_update) = get_default_values(url) feed = get_feed_content(url) try: post_title = feed.entries[0].title post_url = feed.entries[0].link if 'theonlinephotographer' in post_url: try: post_url = feed.entries[0].feedburner_origlink except: pass logger.debug(post_url) old_md5 = hashlib.md5( post_title.encode('utf-8') + feed.entries[0].updated.encode('utf-8') ).hexdigest() md5 = 'v2_' + hashlib.md5( post_title.encode('utf-8') + post_url.encode('utf-8') ).hexdigest() # make it dependant on change if url in md5_sums: logger.debug('existent feed') if md5_sums[url]['md5'] not in [ md5, old_md5 ]: logger.debug(f'hashes NOT equal') last_update = int(time.mktime(datetime.utcnow().timetuple())) + met_offset else: logger.debug('newhash equal to old or new saved hashes') last_update = md5_sums[url]['timestamp'] else: logger.debug('new feed') except: logger.info(f'error when parsing feed {url}') try: #if True: post_title = md5_sums[url]['current_title'] post_url = md5_sums[url]['post_url'] md5 = md5_sums[url]['md5'] last_update = md5_sums[url]['timestamp'] except: pass logger.debug(f"last_update: {last_update}") return md5, post_title, post_url, last_update def examine_photoplacegallery(soup, url, md5): (post_title, post_url, last_update) = ['', '', 0] # logger.debug('examine_photoplacegallery') prothost = re.search(r'^http[s]*:\/\/[\w\.]*', url).group() firstah3 = soup.find_all('a','h3')[0] post_title = firstah3.string post_url = prothost + firstah3.get('href') if url in md5_sums: logger.debug(f'found {url} in md5_sums') if md5_sums[url]['md5'] != md5: logger.debug('md5 not equal') md5_sums[url]['md5'] = md5 last_update = time.time() else: logger.debug('md5 equal') md5 = md5_sums[url]['md5'] last_update = md5_sums[url]['timestamp'] else: last_update = time.time() return md5, post_title, post_url, last_update def examine_lfionline(soup, url, md5): (post_title, post_url, last_update) = ['', '', time.time()] logger.debug('examine_lfionline') all_cards = soup.find_all(name="div", class_="card") for card in all_cards: if not card.find_all('img', src=lambda x: x.endswith('.svg')): post_url = card.find('a')['href'] post_title = card.find(name="h3").text break if url in md5_sums: logger.debug(f'found {url} in md5_sums') if md5_sums[url]['md5'] != md5: logger.debug('md5 not equal') md5_sums[url]['md5'] = md5 else: logger.debug('md5 equal') md5 = md5_sums[url]['md5'] last_update = md5_sums[url]['timestamp'] logger.debug(f"{post_title} {post_url} {last_update}") return md5, post_title, post_url, last_update def examine_generic_website(soup, url, md5): (post_title, post_url, last_update) = ['', '', 0] prothost = re.search(r'^http[s]*:\/\/[\w\.]*', url).group() logger.debug(url) if url in md5_sums: # logger.debug(f"found {url} in md5_sums: \n\t{md5_sums[url]['md5']} vs \n\t{md5}") if md5_sums[url]['md5'] != md5: logger.debug('md5 not equal') md5_sums[url]['md5'] = md5 last_update = int(time.mktime(datetime.utcnow().timetuple())) + met_offset else: logger.debug('md5 equal') # logger.debug(md5_sums[url]['timestamp']) if md5_sums[url]['timestamp'] > 0: last_update = md5_sums[url]['timestamp'] else: last_update = int(time.mktime(datetime.utcnow().timetuple())) + met_offset else: last_update = int(time.mktime(datetime.utcnow().timetuple())) + met_offset logger.debug(last_update) return md5, post_title, post_url, last_update def get_default_values(url): # initialize variables, suitable for new urls (md5, post_title, post_url, last_update) = \ ['', '', '', int(time.mktime(datetime.utcnow().timetuple())) + met_offset] if url in md5_sums: # get stored values if they exist try: md5 = md5_sums[url]['md5'] last_update = md5_sums[url]['timestamp'] post_title = md5_sums[url]['current_title'] post_url = md5_sums[url]['post_url'] except: pass logger.debug(f"last_update: {last_update}") return(md5, post_title, post_url, last_update) def examine_url(url): (md5, post_title, post_url, last_update) = get_default_values(url) logger.debug(url) try: loaded_cookies = md5_sums[url]['cookies'] except: loaded_cookies = {} #if True: try: response = requests.get(url, cookies=loaded_cookies, timeout=feed_timeout) saved_cookies = requests.utils.dict_from_cookiejar(response.cookies) cookies_json = json.dumps(saved_cookies, indent=4) md5_sums[url]['cookies'] = saved_cookies soup = BeautifulSoup(response.text, 'html.parser') all_text = "".join(soup.body.get_text()) md5 = hashlib.sha256(all_text.encode('utf-8')).hexdigest() body = soup.find('body') if 'lfi-online.de' in url: (md5, post_title, post_url, last_update) = examine_lfionline(soup, url, md5) elif "photoplacegallery.com" in url: (md5, post_title, post_url, last_update) = examine_photoplacegallery(soup, url, md5) else: (md5, post_title, post_url, last_update) = examine_generic_website(soup, url, md5) except: logger.warning(f'Error in {url}') return md5, post_title, post_url, last_update def needs_update(url): if len(sys.argv) > 1: return True if url not in md5_sums: return True last_update = md5_sums[url]['timestamp'] epoch = time.mktime(datetime.utcnow().timetuple()) + met_offset logger.debug(f"{last_update} - {epoch} : {((epoch - last_update)/3600):.1f} hours old") minute = datetime.utcfromtimestamp(epoch).minute quarter = 0 if 15 <= minute < 30: quarter = 1 elif 30 <= minute < 45: quarter = 2 else: quarter = 3 diff = epoch - last_update if diff > 3600*24*7: if quarter == 1: return True else: return False elif diff > 3600*18: return True elif diff > 3600*12: if quarter % 2 == 1: return True else: return False elif diff > 3600*6: if quarter == 1: return True else: return False else: return False # Function to get the title, MD5 hash of the HTML content, and the time of the last change for a given URL def get_url_info( blog ): if 'feed' in blog.keys(): url = blog['feed'] else: url = blog['url'] if needs_update(url): logger.debug(f"{url} needs update") if 'feed' in blog.keys(): (md5, post_title, post_url, last_update) = examine_feed(blog['feed']) else: (md5, post_title, post_url, last_update) = examine_url(blog['url']) else: logger.debug(f"{url} needs NO update") md5 = md5_sums[url]['md5'] post_title = md5_sums[url]['current_title'] post_url = md5_sums[url]['post_url'] last_update = md5_sums[url]['timestamp'] if url not in md5_sums.keys(): md5_sums[url] = {} md5_sums[url]['post_url'] = post_url md5_sums[url]['current_title'] = post_title md5_sums[url]['md5'] = md5 md5_sums[url]['timestamp'] = last_update results[url] = { 'blog_url': blog['url'], 'blog_title': blog['title'], 'current_title': post_title, 'post_url': post_url, 'md5': md5, 'last_update': last_update } # ------------------------------------------------------------- main --- filter = False if len(sys.argv) > 1: filter = sys.argv[1] # Loop through the list of URLs and call the function for each URL for b in blogs: if filter: if filter in b['url']: get_url_info(b) else: get_url_info(b) # # save results for development # with open('results.json', 'w') as file: # json.dump(results, file, indent=4) sorted_data = dict(sorted(results.items(), key=lambda x: x[1]["last_update"], reverse=True)) utc_time = datetime.utcnow() epoch_time = int(time.mktime(utc_time.timetuple())) + met_offset time_separator_flag = 0 with open(cronlinks_file, "w") as cronlinks: cronlinks.write("\n