Using Celery for automating digital preservation workflows

Using Celery for automating digital preservation workflows

At UML, one of my responsibilities is creating BagIt archives which are uploaded to the Academic Preservation Trust. The Academic Preservation Trust is a consortium of academic institutions that uses Amazon S3/Glacier services for long-term digital preservation of content.

In 2014, I wrote some software that was used to test APTrust before it entered production. It was a Sinatra based application that had a wizard-like interface that would automate the process of downloading content from an archive server, entering metadata, bagging it, and uploading it to S3.

Now APTrust is in production and we are beginning to upload content to APTrust in earnest.

I began to realize that the wizard-like interface was not going to scale for the size and breadth of the content we are going to upload. So I began to look for alternatives.

I rewrote the software that creates our bags using Python and the BagIt python library. The utility, trusty, creates bags from folders that adhere to the APTrust bagging profile. Along with this, I began using Celery as a task queue so that uploading collections could be scheduled en masse instead of one at a time.

Celery has a lot of benefits, such as better logging, and the ability to have concurrent tasks running at once.

I haven’t published this software on GitHub yet, but I’ve been using it in a production capacity to upload around 3 terabytes of digital assets into APTrust so far.

Since we are using Slack at UML I was able to pipe the notifications and logging that I’ve set up in the Celery tasks into a Slack channel to monitor the service. With the Python requests library this was really easy to setup:


import requests
import json

def post_to_slack(text):
    url = "https://hooks.slack.com/services/secretkey"
    payload = {"channel": "#aptrust", "username": "APTrust Upload Bot", "text": text, "icon_emoji": ":shopping_bags:"}
    headers = {
    'cache-control': "no-cache",
    'postman-token': "819e3e64-a797-da35-d682-488e09bae00a"
    }
    response = requests.post(url, data=json.dumps(payload), headers=headers)
    return repsonse.text