import logging import requests from django.conf import settings from django.utils.timezone import datetime, timedelta logger = logging.getLogger(__name__) headers = { 'Content-Type': 'application/json;charset=utf-8' } def send_week(date): from . import views date = datetime.strptime(date, '%Y-%m-%d') monday = views.get_monday(date) body = views.week(year, month, day) name = 'weekly-digest-%s' % monday subject = 'Phantas.ma weekly update %s' % views.format_week(monday) return send_email(name, subject, body) def send_email(name, subject, body): ''' send_meail('weekly digest 2023-09-22', 'Phantas.ma weekly update Sep 23', body) ''' if not settings.LISTMONK_LISTS: return False data = { 'name': name, 'subject': subject, 'lists': settings.LISTMONK_LISTS, 'content_type': 'html', 'type': 'regular', 'body': body } url = settings.LISTMONK_API + 'campaigns' auth = (settings.LISTMONK_USER, settings.LISTMONK_PASSWORD) r = requests.post(url, json=data, headers=headers, auth=auth).json() list_id = r.get('data', {}).get('id') if list_id and settings.LISTMONK_SEND: url = settings.LISTMONK_API + 'campaigns/%s/status' % list_id data = {"status": "running"} r = requests.put(url, json=data, headers=headers, auth=auth).json() return True else: return False def email2name(email): name = email.split("@")[0] name = name.replace(".", " ") name = " ".join([part.capitalize() for part in name.split(" ")]) return name def add_email(email): url = settings.LISTMONK_API + 'subscribers' auth = (settings.LISTMONK_USER, settings.LISTMONK_PASSWORD) exists = url + '?' + "list_id=&query=email='%s'&page=1&order_by=id&order=desc" % email if not len(requests.get(exists, auth=auth).json()['data']['results']): data = { "email": email, "name": email2name(email), "status": "enabled", "lists": settings.LISTMONK_LISTS, } r = requests.post(url, json=data, headers=headers, auth=auth).json() if r.get("data", {}).get("id"): return True else: logger.error("failed to add email to list %s => %s", email, r) return False return False def delete_bounced_emails(): url = settings.LISTMONK_API + 'bounces' auth = (settings.LISTMONK_USER, settings.LISTMONK_PASSWORD) r = requests.get(url + '?per_page=10000', auth=auth).json() ids = [sub['subscriber_id'] for sub in r['data']['results']] url = settings.LISTMONK_API + 'subscribers' for id in ids: r = requests.delete(url + '?id=%s' % id, auth=auth).json() if not r.get("data"): print(id) def delete_blacklisted_emails(): url = settings.LISTMONK_API + 'subscribers' auth = (settings.LISTMONK_USER, settings.LISTMONK_PASSWORD) r = requests.get(url + '?per_page=10000', auth=auth).json() ids = [u['id'] for u in r['data']['results'] if u['status'] == 'blocklisted'] for id in ids: r = requests.delete(url + '?id=%s' % id, auth=auth).json() if not r.get("data"): print(id)