I have a Python based flask app where I am using the Celery task queue to handle a set of e-mail tasks. I would like the Celery task to be able to query a sqlite database that I have tied into the whole app to pull in and use certain data but I keep getting this following error. If I pull out the one line in the celery task that queries my SQLite database, the task then executes without throwing this error so my assumption is that I am making a fundamental error regarding tying celery and my database together.
[2015-07-18 21:36:25,168: ERROR/MainProcess] Process 'Worker-1' pid:6657 exited with 'signal 11 (SIGSEGV)'
[2015-07-18 21:36:25,187: ERROR/MainProcess] Task app.views.send_mail_task[4c3d5b1a-5ac3-4ab8-b633-b925eba5dd02] raised unexpected: WorkerLostError('Worker exited prematurely: signal 11 (SIGSEGV).',)
Traceback (most recent call last):
File "/Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/billiard/pool.py", line 1171, in mark_as_worker_lost
WorkerLostError: Worker exited prematurely: signal 11 (SIGSEGV).
The relevant code is as follows:
from datetime import datetime
from flask.ext.bcrypt import generate_password_hash
from flask.ext.login import UserMixin
from peewee import *
import config
DATABASE = SqliteDatabase('danish.db')
class Company(Model):
dont_contact_anymore = BooleanField(default=False)
company_name = CharField()
website = CharField(unique=True)
email_address = CharField()
country = CharField()
scraped_on = DateTimeField(formats="%m-%d-%Y")
have_contacted = BooleanField(default=False)
current_pipeline_phase = IntegerField(default=0)
sector = CharField()
contacted_by = ForeignKeyField(
class Meta:
database = DATABASE
order_by = ('have_contacted',)
def create_company(cls, company_name, website, email_address):
with DATABASE.transaction():
cls.create(company_name=company_name, website=website, email_address=email_address, scraped_on=datetime.now)
print 'Saved {}'.format(company_name)
except IntegrityError:
print '{} already exists in the database'.format(company_name)
def initialize():
DATABASE.create_tables([Company, User, Post],safe=True)
function start_the_magic() {
//Get the ids of the companies to contact and remove
var ids = get_the_ids();
var contact = ids[0]
var remove = ids[1]
type: 'POST',
url: '/email_task',
data: JSON.stringify({contact: contact, remove: remove}),
contentType: "application/json; charset=utf-8",
dataType: 'json',
success: function(data, status, request) {
status_url = request.getResponseHeader('Location');
update_progress(status_url, '#progress div');
error: function() {
alert('Unexpected error');
function update_progress(status_url, status_div) {
// send GET request to status URL
$.getJSON(status_url, function(data) {
// update UI
percent = parseInt(data['current'] * 100 / data['total']);
//Remove the table and install the progress bar
$(status_div).text(percent + '%');
if (data['state'] != 'PENDING' && data['state'] != 'PROGRESS') {
if ('result' in data) {
// show result
console.log('update progress worked!');
// $(status_div.childNodes[3]).text('Result: ' + data['result']);
else {
// something unexpected happened
console.log('something unexpected happened');
// $(status_div.childNodes[3]).text('Result: ' + data['state']);
else {
// rerun in 2 seconds
setTimeout(function() {
update_progress(status_url, status_div);
}, 2000);
from flask import render_template, flash, redirect, session, url_for, request, g, jsonify
from flask.ext.login import login_user, logout_user, current_user, login_required, LoginManager
from flask.ext.bcrypt import check_password_hash
from config import DATABASE_QUERY_TIMEOUT
from datetime import datetime
from app import app, db, lm, celery
from .forms import GetClientsForm, LoginForm, CompanyForm, UpdateCompanyForm
from .models import User, Post, DoesNotExist, Company
import simplejson as json
import json
import time
import random
@app.route('/email_task', methods=['POST'])
def email_task():
#get the unique IDs for the companies to contact and remove
data = json.loads(request.data)
contact = data['contact']
remove = data['remove']
print contact
print remove
task = send_mail_task.apply_async(args=[contact, remove])
return jsonify({}), 202, {'Location': url_for('taskstatus',
def taskstatus(task_id):
task = send_mail_task.AsyncResult(task_id)
if task.state == 'PENDING':
response = {
'state': task.state,
'current': 0,
'total': 1,
'status': 'Pending...'
elif task.state != 'FAILURE':
response = {
'state': task.state,
'current': task.info.get('current', 0),
'total': task.info.get('total', 1),
'status': task.info.get('status', '')
if 'result' in task.info:
response['result'] = task.info['result']
# something went wrong in the background job
response = {
'state': task.state,
'current': 1,
'total': 1,
'status': str(task.info), # this is the exception raised
return jsonify(response)
@celery.task(base=DbTask, bind=True)
def send_mail_task(self, contact, remove):
total_contact_companies = len(contact);
verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']
message = ''
for num,i in enumerate(contact):
company = Company.select().where(Company.id == i).get()
print 'Contacting ' + company
message = 'E-mailing {} at {}...'.format(company.company_name, company.email_address)
meta={'current': num, 'total': total_contact_companies,
'status': message})
return {'current': 100, 'total': 100, 'status': 'Task completed!',
'result': 42}
