/ index / WikiBlogger
Here is the script that uploads my emacwiki to Blogger.
#!/usr/bin/env python ''' A tool to convert the output of emacs-wiki to blogger entries. The basic idea is to automatically post the html pages created by emacs-wiki to a blogger account. The following abilities need to be provided: - post only if local copy is newer, - rewrite links so that they still work online, - sanitize the html so that blogger will accept it. To make it easy I will use google's python gdata api interface. Possible milestones: 1: Post without rewriting or sanitizing, 2: Add sanitizing 3: Add link rewriting Assumptions: - blog has archive=none so that pages never change address. ''' __author__ = 'kwhitefo@gmail.com <Kevin Whitefoot>' # Python imports import os import re import commands import sys # gdata imports import gdata.service import gdata.blogger.client import gdata.client import gdata.sample_util import gdata.data import atom.data class WikiBlogger: def __init__(self, wiki_dir, user, pw, connect): """Creates a GDataService and provides ClientLogin auth details to it. The email and password are required arguments for ClientLogin. The 'source' defined below is an arbitrary string, but should be used to reference your name or the name of your organization, the app name and version, with '-' between each of the three values. The connect argument allows offline testing. """ # Load the dictionary that holds the association between # filenames and post ids self.wiki_dir = wiki_dir self.blogger_post_ids_fn = os.path.join(self.wiki_dir, '.blogger-post-ids') self.LoadPostIds() if connect: # Authenticate using ClientLogin. self.client = gdata.blogger.client.BloggerClient() service='blogger' source='kjw-wikiblogger-0.1', self.client.client_login(user, pw, source=source, service=service) # Get the blog ID for the first blog. feed = self.client.get_blogs() self.blog = feed.entry[0] self.blog_id = self.blog.get_blog_id() def PrintUserBlogTitles(self): """Prints a list of all the user's blogs.""" # Request the feed. feed = self.client.get_blogs() # Print the results. print feed.title.text for entry in feed.entry: print "\t" + entry.title.text print def CreatePost(self, title, content, is_draft): """This method creates a new post on a blog. The new post can be stored as a draft or published based on the value of the is_draft parameter. The method creates an GDataEntry for the new post using the title, content, author_name and is_draft parameters. With is_draft, True saves the post as a draft, while False publishes the post. Then it uses the given GDataService to insert the new post. If the insertion is successful, the added post (GDataEntry, gdata-2.0.7/pydocs/gdata.html#GDataEntry) will be returned. """ return self.client.add_post(self.blog_id, title, content, draft=is_draft) def LoadPosts(self): """ Requests the posts feed for the blogs and returns the entries. """ # Request the feed. feed = self.client.get_posts(self.blog_id) self.entries = {} for entry in feed.entry: self.entries[entry.post_id] = entry def PrintPostsInDateRange(self, start_time, end_time): """This method displays the title and modification time for any posts that have been created or updated in the period between the start_time and end_time parameters. The method creates the query, submits it to the GDataService, and then displays the results. Note that while the start_time is inclusive, the end_time is exclusive, so specifying an end_time of '2007-07-01' will include those posts up until 2007-6-30 11:59:59PM. The start_time specifies the beginning of the search period (inclusive), while end_time specifies the end of the search period (exclusive). """ # Create query and submit a request. query = gdata.blogger.client.Query(updated_min=start_time, updated_max=end_time, order_by='updated') print query.updated_min print query.order_by feed = self.client.get_posts(self.blog_id, query=query) # Print the results. print feed.title.text + " posts between " + start_time + " and " + end_time print feed.title.text for entry in feed.entry: if not entry.title.text: print "\tNo Title" else: print "\t" + entry.title.text print def UpdatePostTitle(self, entry_to_update, new_title): """This method updates the title of the given post. The GDataEntry object is updated with the new title, then a request is sent to the GDataService. If the insertion is successful, the updated post will be returned. Note that other characteristics of the post can also be modified by updating the values of the entry object before submitting the request. The entry_to_update is a GDatEntry containing the post to update. The new_title is the text to use for the post's new title. Returns: a GDataEntry containing the newly-updated post. """ # Set the new title in the Entry object entry_to_update.title = atom.data.Title(type='xhtml', text=new_title) return self.client.update(entry_to_update) def CreateComment(self, post_id, comment_text): """This method adds a comment to the specified post. First the comment feed's URI is built using the given post ID. Then a GDataEntry is created for the comment and submitted to the GDataService. The post_id is the ID of the post on which to post comments. The comment_text is the text of the comment to store. Returns: an entry containing the newly-created comment NOTE: This functionality is not officially supported yet. """ return self.client.add_comment(self.blog_id, post_id, comment_text) def PrintAllComments(self, post_id): """This method displays all the comments for the given post. First the comment feed's URI is built using the given post ID. Then the method requests the comments feed and displays the results. Takes the post_id of the post on which to view comments. """ feed = self.client.get_post_comments(self.blog_id, post_id) # Display the results print feed.title.text for entry in feed.entry: print "\t" + entry.title.text print "\t" + entry.updated.text print def DeleteComment(self, comment_entry): """This method removes the comment specified by the given edit_link_href, the URI for editing the comment. """ self.client.delete(comment_entry) def DeletePost(self, post_entry): """This method removes the post specified by the given edit_link_href, the URI for editing the post. """ self.client.delete(post_entry) def LoadPostIds(self): """Load the file that associates files and postids. """ fn = self.blogger_post_ids_fn if os.path.exists(fn): f = open(fn, "r") t = f.read() f.close() try: self.post_ids = eval(t) except: print "Warning corrupt status file, resetting" self.post_ids = {} else: print "post_ids file does not exist: ", fn self.post_ids = {} def SavePostIds(self): """Saves the file that associates files and postids. """ fn = self.blogger_post_ids_fn f = open(fn, "w") f.write(self.post_ids.__repr__()) f.close() def Upload(self, max_files): """Upload changed files. Max_files allows us to pace ourselves to avoid hitting the Blogger limit of 50 upload a day. """ for fn in os.listdir(self.wiki_dir): print fn if self.qUpload(fn): max_files -= 1 if max_files <= 0: print "Uploaded max. files" break def qUpload(self, fn): """Upload file if newer than entry on blog. """ id_time = self.post_ids.get(fn) full_name = os.path.join(self.wiki_dir, fn) mtime = os.stat(full_name).st_mtime post = None print "Upload or update: ", fn if id_time is None: # not present so upload post = self.UploadOne(full_name) else: # Present, check if changed if len(id_time) == 2: # Old style post_ids file did not have posted_url. posted_url = "" posted_id, post_time = id_time else: posted_id, posted_url, post_time = id_time if post_time < mtime: # present but out of date post = self.UpdateOne(full_name, posted_id) if post is None: # Actually it wasn't present after all, presumably # deleted by owner. post = self.UploadOne(full_name) else: print "Already up to date" if post is None: # didn't do anything return False else: # uploaded or updated so update the record self.post_ids[fn] = (post.get_post_id(), post.FindAlternateLink(), mtime) self.SavePostIds() return True def UploadOne(self, full_name): """Upload file. """ print "Upload ", full_name title, body = self.LoadAndRewrite(full_name) post = self.CreatePost(title, body, False) print "Successfully created public post: \"" + post.title.text + "\".\n" print post.__str__ # Get the post ID. To enable us to update later we need # this to be associated with the file. return post def LoadAndRewrite(self, full_name): """ Use tidy to ensure that non-Ascii characters are replaced with entities. If you don't the Blogger API might choke on non-UTF8 characters. """ #f = open(full_name, "r") #html = f.read() status, html = commands.getstatusoutput('tidy "' + full_name + '"') print "Status: ", status #print "html: " , html title = re.findall("<title>(.*)</title>", html) body = re.findall(r"<!-- Page published by Emacs Wiki begins here -->(.*)</body>", html, re.DOTALL) #print "body: ", body body = self.RewriteLinks(body[0].strip()) return title[0].strip(), body def RewriteLinks(self, html): """Replace the local hrefs with the addresses recorded in the post_ids. """ #print "ids" #print "html: ", html #print self.post_ids.items() for item in self.post_ids.items(): fn, ids = item if 2 < len(ids): # Original did not have url html = self.RewriteLink(html, fn, ids[1]) return html def RewriteLink(self, html, filename, replacement): """Replace the local hrefs with the addresses recorded in the post_ids. Is there a more efficient way? Could create a pattern for each filename when we load the post_ids. However as the principal use case is the updating of one or very few files this won't save much time. """ #print "RewriteLink", filename, replacement search_pattern = r'(<a *href *= *")' + filename + r'(" *>.*</a>)' #print "ps: ", search_pattern pattern = re.compile(search_pattern) #print "Pattern: ", str(search_pattern) return re.sub(search_pattern, r"\1" + replacement + r"\2", html) def GetpostByID(self, post_id): """Fetch a post to be updated. See http://stackoverflow.com/questions/2152112/blogger-python-api-how-do-i-retrieve-a-post-by-post-id, http://blog.oddbit.com/2010/01/retrieving-blogger-posts-by-post-id.html """ print "GetpostByID", post_id try: return self.client.get_feed( self.blog.get_post_link().href + '/%s' % post_id, auth_token=self.client.auth_token, desired_class=gdata.blogger.data.BlogPost) except gdata.client.RequestError, inst: print "Exception thrown:" print type(inst) # the exception instance print inst # __str__ allows args to printed directly print "dir: ", dir(inst) return None except Exception, inst: print "Failed to get post by id for unexpected reason." print inst # __str__ allows args to printed directly print "dir: ", dir(inst) raise # do not handle def UpdateOne(self, full_name, post_id): """Update a post. """ print "Update ", full_name f = open(full_name, "r") t = f.read() post = self.GetpostByID(post_id) # print "post: ", post # print "dir: ", dir(post) # print "link:", post.GetSelfLink() # print "link:", post.GetPostLink() # print "link:", post.link # print "link:", post.get_html_link() # print "link:", post.FindUrl() print "url: ", post.FindAlternateLink() #print "link:", post.find_self_link() if post is None: return None post.text = t post.AddLabel("wikiblogger") self.client.update(post) return post def main(): """ """ src = sys.argv[1] user = sys.argv[2] pw = sys.argv[3] print "src: ", src print "user: ", user print "pw: ", pw wb = WikiBlogger(os.path.expanduser(src), user, pw, True) # TODO: externalise wb.Upload(10) if __name__ == '__main__': main()
No comments:
Post a Comment