| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- #!/usr/bin/python3
- from bs4 import BeautifulSoup
- import configparser
- from datetime import datetime
- from dateutil.parser import parse
- import feedparser
- import hashlib
- import json
- import requests
- import re
- import sys
- import time
- import logging
- from logging.config import fileConfig
- config = configparser.ConfigParser()
- config.read('blogs-i-read_v2.ini')
- blogs_to_read = config['blogsiread']['blogfile']
- cronlinks_file = config['blogsiread']['cronlinksfile']
- fileConfig('logging_config.ini')
- logger = logging.getLogger("blogs-i-read_v2")
- with open(blogs_to_read, 'r') as blogfile:
- blogs = json.load(blogfile)
- met_offset = 3600
- md5_sums = {}
- try:
- # Read the JSON file containing the MD5 sums
- with open('md5_sums.json', 'r') as file:
- md5_sums = json.load(file)
- except:
- logger.debug('could not open md5_sums.json')
- # Dictionary to store the results
- results = {}
- def get_timestamp(ts):
- logger.debug(ts)
- if bool(re.search('\dT\d\d:\d\d:\d\dZ$', ts)): # 2024-01-19T16:25:19Z
- return time.mktime(datetime.strptime(ts, "%Y-%m-%dT%H:%M:%SZ").timetuple())
- elif bool(re.search('\dT\d\d:\d\d:\d\d[+\-]\d\d', ts)): # 2024-01-30T12:51:31-06:00
- return time.mktime(datetime.strptime(ts, "%Y-%m-%dT%H:%M:%S%z").timetuple())
- elif bool(re.search('\dT\d', ts)): # 2024-01-19T16:25:19Z
- return time.mktime(datetime.strptime(ts, "%Y-%m-%dT%H:%M:%S.%f%z").timetuple())
- elif bool(re.search('^\D\D\D.*GMT$', ts)): # Tue, 09 Jan 2024 14:15:58 GMT
- return time.mktime(datetime.strptime(ts, "%a, %d %b %Y %H:%M:%S GMT").timetuple())
- elif bool(re.search('^\D\D\D,', ts)): # Thu, 01 Feb 2024 11:00:56 +0000
- return time.mktime(datetime.strptime(ts, "%a, %d %b %Y %H:%M:%S %z").timetuple())
- else:
- sys.exit(1)
-
- def examine_feed(url):
- (md5, post_title, post_url, last_update) = ['', '', '', 0]
- # logger.debug(f'examine_feed {url}')
- try:
- #if True:
- feed = feedparser.parse(url)
- post_title = feed.entries[0].title
- md5 = hashlib.md5( post_title.encode('utf-8') + feed.entries[0].updated.encode('utf-8') ).hexdigest()
- post_url = feed.entries[0].link
- # make it dependant on change
- if url in md5_sums:
- # logger.debug(f'url {url} in md5_sums')
- if md5_sums[url]['md5'] != md5:
- # logger.debug(f'hashes NOT equal')
- utc_time = datetime.utcnow()
- last_update = int(time.mktime(utc_time.timetuple())) + met_offset
- else:
- # logger.debug('hashes are equal')
- if md5_sums[url]['timestamp'] < 1:
- # logger.debug(f'first timestamp')
- last_update = get_timestamp(feed.entries[0].updated)
- else:
- # logger.debug('keep timestamp')
- last_update = md5_sums[url]['timestamp']
- # logger.debug( f'{post_title} , {post_url}, {last_update}, {md5}' )
- except:
- logger.info(f'error when parsing feed {url}')
- return md5, post_title, post_url, last_update
- def examine_photoplacegallery(soup, url, md5):
- (post_title, post_url, last_update) = ['', '', 0]
- # logger.debug('examine_photoplacegallery')
- prothost = re.search(r'^http[s]*:\/\/[\w\.]*', url).group()
- firstah3 = soup.find_all('a','h3')[0]
- post_title = firstah3.string
- post_url = prothost + firstah3.get('href')
- if url in md5_sums:
- logger.debug(f'found {url} in md5_sums')
- if md5_sums[url]['md5'] != md5:
- logger.debug('md5 not equal')
- md5_sums[url]['md5'] = md5
- last_update = time.time()
- else:
- logger.debug('md5 equal')
- md5 = md5_sums[url]['md5']
- last_update = md5_sums[url]['timestamp']
- else:
- last_update = time.time()
- return md5, post_title, post_url, last_update
- def examine_generic_website(soup, url, md5):
- (post_title, post_url, last_update) = ['', '', 0]
- prothost = re.search(r'^http[s]*:\/\/[\w\.]*', url).group()
- if url in md5_sums:
- logger.debug(f"found {url} in md5_sums: \n\t{md5_sums[url]['md5']} vs \n\t{md5}")
- if md5_sums[url]['md5'] != md5:
- logger.debug('md5 not equal')
- md5_sums[url]['md5'] = md5
- last_update = time.time()
- else:
- logger.debug('md5 equal')
- last_update = md5_sums[url]['timestamp']
- else:
- last_update = time.time()
- return md5, post_title, post_url, last_update
- def examine_url(url):
- logger.debug(url)
- (md5, post_title, post_url, last_update) = ['', '', '', 0]
- try:
- response = requests.get(url)
- md5 = hashlib.md5(response.content).hexdigest() # Calculate the MD5 hash
- soup = BeautifulSoup(response.text, 'html.parser')
- body = soup.find('body')
- the_contents_of_body_without_body_tags = body.findChildren(recursive=False)
- # if True:
- if "photoplacegallery.com" in url:
- (md5, post_title, post_url, last_update) = examine_photoplacegallery(soup, url, md5)
- elif "claudioturri.it" in url:
- (md5, post_title, post_url, last_update) = examine_generic_website(soup, url, md5)
- elif "picturesfromthezone" in url:
- md5 = hashlib.md5(body.get_text().encode('utf-8')).hexdigest()
- (md5, post_title, post_url, last_update) = examine_generic_website(soup, url, md5)
- elif "magnumphotos" in url:
- md5 = hashlib.md5(body.get_text().encode('utf-8')).hexdigest()
- (md5, post_title, post_url, last_update) = examine_generic_website(soup, url, md5)
- elif "robdeloephotography" in url:
- md5 = hashlib.md5(body.get_text().encode('utf-8')).hexdigest()
- (md5, post_title, post_url, last_update) = examine_generic_website(soup, url, md5)
- elif "camerawork.de" in url:
- (md5, post_title, post_url, last_update) = examine_generic_website(soup, url, md5)
- elif "jeanlucfeixa" in url:
- (md5, post_title, post_url, last_update) = examine_generic_website(soup, url, md5)
- elif "rudyortega.com" in url:
- (md5, post_title, post_url, last_update) = examine_generic_website(soup, url, md5)
- elif "donttakepictures.com" in url:
- md5 = hashlib.md5(body.get_text().encode('utf-8')).hexdigest()
- (md5, post_title, post_url, last_update) = examine_generic_website(soup, url, md5)
- elif "mikepeters-photography.com" in url:
- md5 = hashlib.md5(body.get_text().encode('utf-8')).hexdigest()
- (md5, post_title, post_url, last_update) = examine_generic_website(soup, url, md5)
- elif "zauber-allenthalben" in url:
- (md5, post_title, post_url, last_update) = examine_generic_website(soup, url, md5)
- else:
- logger.info(f"needs treatment: {url}")
- except:
- pass
- return md5, post_title, post_url, last_update
- # Function to get the title, MD5 hash of the HTML content, and the time of the last change for a given URL
- def get_url_info( blog ):
- if 'feed' in blog.keys():
- (md5, post_title, post_url, last_update) = examine_feed(blog['feed'])
- url = blog['feed']
- else:
- (md5, post_title, post_url, last_update) = examine_url(blog['url'])
- url = blog['url']
- time_diff = 0
- # Compare the MD5 hash with the one from the JSON file
- if url in md5_sums and md5_sums[url]['md5'] == md5:
- change_status = 'No Change'
- else:
- change_status = 'Changed'
- md5_sums[url] = { 'md5' : md5, 'timestamp' : last_update }
- results[url] = { 'blog_url': blog['url'],
- 'blog_title': blog['title'],
- 'current_title': post_title,
- 'post_url': post_url,
- 'md5': md5,
- 'last_update': last_update }
- filter = False
- if len(sys.argv) > 1:
- filter = sys.argv[1]
- # Loop through the list of URLs and call the function for each URL
- for b in blogs:
- if filter:
- if filter in b['url']:
- get_url_info(b)
- else:
- get_url_info(b)
- # # save results for development
- # with open('results.json', 'w') as file:
- # json.dump(results, file, indent=4)
- sorted_data = dict(sorted(results.items(), key=lambda x: x[1]["last_update"], reverse=True))
- utc_time = datetime.utcnow()
- epoch_time = int(time.mktime(utc_time.timetuple())) + met_offset # offset for american blogs
- time_separator_flag = 0
- with open(cronlinks_file, "w") as cronlinks:
- cronlinks.write("\n<li style='font-weight: bold;'>Hot from the Blogosphere</li>\n\t<ul>\n")
- for r in sorted_data:
- if not sorted_data[r]['current_title']:
- sorted_data[r]['current_title'] = ''
-
- lupd = sorted_data[r]['last_update']
- if epoch_time - lupd > 10*3188967:
- if time_separator_flag < 4:
- cronlinks.write("</ul>\n<li style='font-weight: bold;'>From medieval ages</li>\n\t<ul>\n")
- time_separator_flag = 4
- elif epoch_time - lupd > 3188967:
- if time_separator_flag < 3:
- cronlinks.write("</ul>\n<li style='font-weight: bold;'>A month and older</li>\n\t<ul>\n")
- time_separator_flag = 3
- elif epoch_time - int(lupd) > 815000:
- if time_separator_flag < 2:
- cronlinks.write("</ul>\n<li style='font-weight: bold;'>A week and older</li>\n\t<ul>\n")
- time_separator_flag = 2
- elif epoch_time - lupd > 150000:
- if time_separator_flag < 1:
- cronlinks.write("</ul>\n<li style='font-weight: bold;'>A day and older</li>\n\t<ul>\n")
- time_separator_flag = 1
- sdr = sorted_data[r]
- cronlinks.write(f"\t<li><a href='{sdr['blog_url']}' target='_blank'>{sdr['blog_title']}</a>" +
- " // " +
- f"<a href='{sdr['post_url']}' target='_blank'>{sdr['current_title']}</a></li>\n")
- # \t<!-- {datetime.datetime.fromtimestamp(lupd)} // {epoch_time} - {lupd} = {epoch_time - lupd} :: {time_separator_flag} -->\n"
- # save hashes and timestamps
- with open('md5_sums.json', 'w') as file:
- json.dump(md5_sums, file, indent=4)
- # local variables:
- # compile-command: "python3 blogs-i-read_v2.py"
- # end:
|