/ index / WikiBlogger
Here is the script that uploads my emacwiki to Blogger.
#!/usr/bin/env python
'''
A tool to convert the output of emacs-wiki to blogger entries.
The basic idea is to automatically post the html pages created by
emacs-wiki to a blogger account.
The following abilities need to be provided:
- post only if local copy is newer,
- rewrite links so that they still work online,
- sanitize the html so that blogger will accept it.
To make it easy I will use google's python gdata api interface.
Possible milestones:
1: Post without rewriting or sanitizing,
2: Add sanitizing
3: Add link rewriting
Assumptions:
- blog has archive=none so that pages never change address.
'''
__author__ = 'kwhitefo@gmail.com <Kevin Whitefoot>'
# Python imports
import os
import re
import commands
import sys
# gdata imports
import gdata.service
import gdata.blogger.client
import gdata.client
import gdata.sample_util
import gdata.data
import atom.data
class WikiBlogger:
def __init__(self, wiki_dir, user, pw, connect):
"""Creates a GDataService and provides ClientLogin auth details to it.
The email and password are required arguments for ClientLogin. The
'source' defined below is an arbitrary string, but should be used to
reference your name or the name of your organization, the app name and
version, with '-' between each of the three values.
The connect argument allows offline testing.
"""
# Load the dictionary that holds the association between
# filenames and post ids
self.wiki_dir = wiki_dir
self.blogger_post_ids_fn = os.path.join(self.wiki_dir, '.blogger-post-ids')
self.LoadPostIds()
if connect:
# Authenticate using ClientLogin.
self.client = gdata.blogger.client.BloggerClient()
service='blogger'
source='kjw-wikiblogger-0.1',
self.client.client_login(user, pw, source=source, service=service)
# Get the blog ID for the first blog.
feed = self.client.get_blogs()
self.blog = feed.entry[0]
self.blog_id = self.blog.get_blog_id()
def PrintUserBlogTitles(self):
"""Prints a list of all the user's blogs."""
# Request the feed.
feed = self.client.get_blogs()
# Print the results.
print feed.title.text
for entry in feed.entry:
print "\t" + entry.title.text
print
def CreatePost(self, title, content, is_draft):
"""This method creates a new post on a blog. The new post can
be stored as a draft or published based on the value of the
is_draft parameter. The method creates an GDataEntry for the
new post using the title, content, author_name and is_draft
parameters. With is_draft, True saves the post as a draft,
while False publishes the post. Then it uses the given
GDataService to insert the new post. If the insertion is
successful, the added post (GDataEntry,
gdata-2.0.7/pydocs/gdata.html#GDataEntry) will be returned.
"""
return self.client.add_post(self.blog_id, title, content, draft=is_draft)
def LoadPosts(self):
""" Requests the posts feed for the blogs and returns the
entries. """
# Request the feed.
feed = self.client.get_posts(self.blog_id)
self.entries = {}
for entry in feed.entry:
self.entries[entry.post_id] = entry
def PrintPostsInDateRange(self, start_time, end_time):
"""This method displays the title and modification time for any posts that
have been created or updated in the period between the start_time and
end_time parameters. The method creates the query, submits it to the
GDataService, and then displays the results.
Note that while the start_time is inclusive, the end_time is exclusive, so
specifying an end_time of '2007-07-01' will include those posts up until
2007-6-30 11:59:59PM.
The start_time specifies the beginning of the search period (inclusive),
while end_time specifies the end of the search period (exclusive).
"""
# Create query and submit a request.
query = gdata.blogger.client.Query(updated_min=start_time,
updated_max=end_time,
order_by='updated')
print query.updated_min
print query.order_by
feed = self.client.get_posts(self.blog_id, query=query)
# Print the results.
print feed.title.text + " posts between " + start_time + " and " + end_time
print feed.title.text
for entry in feed.entry:
if not entry.title.text:
print "\tNo Title"
else:
print "\t" + entry.title.text
print
def UpdatePostTitle(self, entry_to_update, new_title):
"""This method updates the title of the given post. The GDataEntry object
is updated with the new title, then a request is sent to the GDataService.
If the insertion is successful, the updated post will be returned.
Note that other characteristics of the post can also be modified by
updating the values of the entry object before submitting the request.
The entry_to_update is a GDatEntry containing the post to update.
The new_title is the text to use for the post's new title. Returns: a
GDataEntry containing the newly-updated post.
"""
# Set the new title in the Entry object
entry_to_update.title = atom.data.Title(type='xhtml', text=new_title)
return self.client.update(entry_to_update)
def CreateComment(self, post_id, comment_text):
"""This method adds a comment to the specified post. First the comment
feed's URI is built using the given post ID. Then a GDataEntry is created
for the comment and submitted to the GDataService. The post_id is the ID
of the post on which to post comments. The comment_text is the text of the
comment to store. Returns: an entry containing the newly-created comment
NOTE: This functionality is not officially supported yet.
"""
return self.client.add_comment(self.blog_id, post_id, comment_text)
def PrintAllComments(self, post_id):
"""This method displays all the comments for the given post. First the
comment feed's URI is built using the given post ID. Then the method
requests the comments feed and displays the results. Takes the post_id
of the post on which to view comments.
"""
feed = self.client.get_post_comments(self.blog_id, post_id)
# Display the results
print feed.title.text
for entry in feed.entry:
print "\t" + entry.title.text
print "\t" + entry.updated.text
print
def DeleteComment(self, comment_entry):
"""This method removes the comment specified by the given edit_link_href, the
URI for editing the comment.
"""
self.client.delete(comment_entry)
def DeletePost(self, post_entry):
"""This method removes the post specified by the given edit_link_href, the
URI for editing the post.
"""
self.client.delete(post_entry)
def LoadPostIds(self):
"""Load the file that associates files and postids.
"""
fn = self.blogger_post_ids_fn
if os.path.exists(fn):
f = open(fn, "r")
t = f.read()
f.close()
try:
self.post_ids = eval(t)
except:
print "Warning corrupt status file, resetting"
self.post_ids = {}
else:
print "post_ids file does not exist: ", fn
self.post_ids = {}
def SavePostIds(self):
"""Saves the file that associates files and postids.
"""
fn = self.blogger_post_ids_fn
f = open(fn, "w")
f.write(self.post_ids.__repr__())
f.close()
def Upload(self, max_files):
"""Upload changed files. Max_files allows us to pace
ourselves to avoid hitting the Blogger limit of 50 upload a
day.
"""
for fn in os.listdir(self.wiki_dir):
print fn
if self.qUpload(fn):
max_files -= 1
if max_files <= 0:
print "Uploaded max. files"
break
def qUpload(self, fn):
"""Upload file if newer than entry on blog.
"""
id_time = self.post_ids.get(fn)
full_name = os.path.join(self.wiki_dir, fn)
mtime = os.stat(full_name).st_mtime
post = None
print "Upload or update: ", fn
if id_time is None:
# not present so upload
post = self.UploadOne(full_name)
else:
# Present, check if changed
if len(id_time) == 2:
# Old style post_ids file did not have posted_url.
posted_url = ""
posted_id, post_time = id_time
else:
posted_id, posted_url, post_time = id_time
if post_time < mtime:
# present but out of date
post = self.UpdateOne(full_name, posted_id)
if post is None:
# Actually it wasn't present after all, presumably
# deleted by owner.
post = self.UploadOne(full_name)
else:
print "Already up to date"
if post is None:
# didn't do anything
return False
else:
# uploaded or updated so update the record
self.post_ids[fn] = (post.get_post_id(), post.FindAlternateLink(), mtime)
self.SavePostIds()
return True
def UploadOne(self, full_name):
"""Upload file.
"""
print "Upload ", full_name
title, body = self.LoadAndRewrite(full_name)
post = self.CreatePost(title, body, False)
print "Successfully created public post: \"" + post.title.text + "\".\n"
print post.__str__
# Get the post ID. To enable us to update later we need
# this to be associated with the file.
return post
def LoadAndRewrite(self, full_name):
"""
Use tidy to ensure that non-Ascii characters are replaced with
entities. If you don't the Blogger API might choke on
non-UTF8 characters.
"""
#f = open(full_name, "r")
#html = f.read()
status, html = commands.getstatusoutput('tidy "' + full_name + '"')
print "Status: ", status
#print "html: " , html
title = re.findall("<title>(.*)</title>", html)
body = re.findall(r"<!-- Page published by Emacs Wiki begins here -->(.*)</body>",
html, re.DOTALL)
#print "body: ", body
body = self.RewriteLinks(body[0].strip())
return title[0].strip(), body
def RewriteLinks(self, html):
"""Replace the local hrefs with the addresses recorded in the
post_ids.
"""
#print "ids"
#print "html: ", html
#print self.post_ids.items()
for item in self.post_ids.items():
fn, ids = item
if 2 < len(ids):
# Original did not have url
html = self.RewriteLink(html, fn, ids[1])
return html
def RewriteLink(self, html, filename, replacement):
"""Replace the local hrefs with the addresses recorded in the
post_ids.
Is there a more efficient way? Could create a pattern for
each filename when we load the post_ids. However as the
principal use case is the updating of one or very few files
this won't save much time.
"""
#print "RewriteLink", filename, replacement
search_pattern = r'(<a *href *= *")' + filename + r'(" *>.*</a>)'
#print "ps: ", search_pattern
pattern = re.compile(search_pattern)
#print "Pattern: ", str(search_pattern)
return re.sub(search_pattern,
r"\1" + replacement + r"\2",
html)
def GetpostByID(self, post_id):
"""Fetch a post to be updated. See
http://stackoverflow.com/questions/2152112/blogger-python-api-how-do-i-retrieve-a-post-by-post-id,
http://blog.oddbit.com/2010/01/retrieving-blogger-posts-by-post-id.html
"""
print "GetpostByID", post_id
try:
return self.client.get_feed(
self.blog.get_post_link().href + '/%s' % post_id,
auth_token=self.client.auth_token,
desired_class=gdata.blogger.data.BlogPost)
except gdata.client.RequestError, inst:
print "Exception thrown:"
print type(inst) # the exception instance
print inst # __str__ allows args to printed directly
print "dir: ", dir(inst)
return None
except Exception, inst:
print "Failed to get post by id for unexpected reason."
print inst # __str__ allows args to printed directly
print "dir: ", dir(inst)
raise # do not handle
def UpdateOne(self, full_name, post_id):
"""Update a post.
"""
print "Update ", full_name
f = open(full_name, "r")
t = f.read()
post = self.GetpostByID(post_id)
# print "post: ", post
# print "dir: ", dir(post)
# print "link:", post.GetSelfLink()
# print "link:", post.GetPostLink()
# print "link:", post.link
# print "link:", post.get_html_link()
# print "link:", post.FindUrl()
print "url: ", post.FindAlternateLink()
#print "link:", post.find_self_link()
if post is None:
return None
post.text = t
post.AddLabel("wikiblogger")
self.client.update(post)
return post
def main():
"""
"""
src = sys.argv[1]
user = sys.argv[2]
pw = sys.argv[3]
print "src: ", src
print "user: ", user
print "pw: ", pw
wb = WikiBlogger(os.path.expanduser(src), user, pw, True) # TODO: externalise
wb.Upload(10)
if __name__ == '__main__':
main()
No comments:
Post a Comment