Source code for emaillib.emaillib

#!/usr/bin/env python2.7
# -*- coding: UTF-8 -*-
# File: emaillib.py
"""Main module file"""

import logging
import os
import smtplib
import re
from email.MIMEMultipart import MIMEMultipart
from email.MIMEText import MIMEText
from email.mime.application import MIMEApplication
from email.utils import COMMASPACE, formatdate, parseaddr

__author__ = '''Costas Tyfoxylos <costas.tyf@gmail.com>'''
__docformat__ = 'plaintext'
__date__ = '''16-09-2017'''

# This is the main prefix used for logging
LOGGER_BASENAME = '''emaillib'''
LOGGER = logging.getLogger(LOGGER_BASENAME)
LOGGER.addHandler(logging.NullHandler())


[docs]class SmtpServer(object): """A simple wrapper around build in smtplib capabilities""" def __init__(self, address, username=None, password=None, tls=True, ssl=False, port=None): self._smtp = None self._address = address self._tls = tls self._ssl = ssl self._port = port self._username = username self._password = password self._connected = False self._logger = logging.getLogger(__name__) @property def tls(self): """The setting of tls upon instantiation""" return self._tls @property def ssl(self): """The setting of ssl upon instantiation""" return self._ssl @property def username(self): """The username upon instantiation""" return self._username @property def password(self): """The password upon instantiation""" return self._password @property def address(self): """The smtp server address upon instantiation""" return self._address @property def port(self): """The smtp server port upon instantiation""" return self._port @property def connected(self): """The status of connection to the smtp server""" return self._connected
[docs] def connect(self): """Initializes a connection to the smtp server :return: True on success, False otherwise """ connection_method = 'SMTP_SSL' if self.ssl else 'SMTP' self._logger.debug('Trying to connect via {}'.format(connection_method)) smtp = getattr(smtplib, connection_method) if self.port: self._smtp = smtp(self.address, self.port) else: self._smtp = smtp(self.address) self._smtp.ehlo() if self.tls: self._smtp.starttls() self._smtp.ehlo() self._logger.info('Got smtp connection') if self.username and self.password: self._logger.info('Logging in') self._smtp.login(self.username, self.password) self._connected = True
[docs] def send(self, sender, recipients, cc=None, bcc=None, subject='', body='', attachments=None, content='text'): """Sends the email :param sender: The server of the message :param recipients: The recipients (To:) of the message :param cc: The CC recipients of the message :param bcc: The BCC recipients of the message :param subject: The subject of the message :param body: The body of the message :param attachments: The attachments of the message :param content: The type of content the message [text/html] :return: True on success, False otherwise """ if not self.connected: self._logger.error(('Server not connected, cannot send message, ' 'please connect() first and disconnect() when ' 'the connection is not needed any more')) return False try: message = Message(sender, recipients, cc, bcc, subject, body, attachments, content) self._smtp.sendmail(message.sender, message.recipients, message.as_string) result = True self._connected = False self._logger.debug('Done') except Exception: # noqa self._logger.exception('Something went wrong!') result = False return result
[docs] def disconnect(self): """Disconnects from the remote smtp server :return: True on success, False otherwise """ if self.connected: try: self._smtp.close() self._connected = False result = True except Exception: # noqa self._logger.exception('Something went wrong!') result = False return result
[docs]class EasySender(object): """A simple wrapper around the SmtpServer object""" def __init__(self, address, username=None, password=None, tls=False, ssl=True): self._server = SmtpServer(address, username, password, tls, ssl)
[docs] def send(self, sender, recipients, cc=None, bcc=None, subject='', body='', attachments=None, content='text'): """Sends the email by connecting and disconnecting after the send :param sender: The sender of the message :param recipients: The recipients (To:) of the message :param cc: The CC recipients of the message :param bcc: The BCC recipients of the message :param subject: The subject of the message :param body: The body of the message :param attachments: The attachments of the message :param content: The type of content the message [text/html] :return: True on success, False otherwise """ self._server.connect() self._server.send(sender, recipients, cc, bcc, subject, body, attachments, content) self._server.disconnect() return True
[docs]class Message(object): """A model of an email message""" def __init__(self, sender, recipients, cc=None, bcc=None, subject='', body='', attachments=None, content='text'): self._sender = None self._recipients = None self._to = [] self._cc = [] self._bcc = [] self._subject = None self._body = None self._attachments = None self._parts = None self._content = None self._message = None self.sender = sender self.to = recipients self.cc = cc self.bcc = bcc self.subject = subject self.body = body self.attachments = attachments self.content = content self._setup_message() def _setup_message(self): """Constructs the actual underlying message with provided values""" if self.content == 'html': self._message = MIMEMultipart('alternative') part = MIMEText(self.body, 'html', 'UTF-8') else: self._message = MIMEMultipart() part = MIMEText(self.body, 'plain', 'UTF-8') self._message.preamble = 'Multipart massage.\n' self._message.attach(part) self._message['From'] = self.sender self._message['To'] = COMMASPACE.join(self.to) if self.cc: self._message['Cc'] = COMMASPACE.join(self.cc) self._message['Date'] = formatdate(localtime=True) self._message['Subject'] = self.subject for part in self._parts: self._message.attach(part) @staticmethod def _validate_simple(email): """Does a simple validation of an email by matching it to a regexps :param email: The email to check :return: The valid Email address :raises: ValueError if value is not a valid email """ name, address = parseaddr(email) if not re.match('[^@]+@[^@]+\.[^@]+', address): raise ValueError('Invalid email :{email}'.format(email=email)) return address @staticmethod def _comma_delimited_to_list(value): if not isinstance(value, (list, tuple)): return [entry.strip() for entry in value.split(',')] return value @property def sender(self): """The email address of the sender""" return self._sender @sender.setter def sender(self, value): if not value: raise ValueError('Sender cannot be empty.') self._sender = self._validate_simple(value) @property def recipients(self): """A list of all recipients of the message""" return self.to + self.cc + self.bcc def _get_recipients(self, value): recipients = self._comma_delimited_to_list(value) if value else [] return [self._validate_simple(recipient) for recipient in recipients] @property def to(self): """The main (to) recipients of the message""" return self._to @to.setter def to(self, value): if not value: raise ValueError('Recipients cannot be empty.') self._to = self._get_recipients(value) @property def cc(self): """The cc recipients of the message""" return self._cc @cc.setter def cc(self, value): self._cc = self._get_recipients(value) @property def bcc(self): """The bcc recipients of the message""" return self._bcc @bcc.setter def bcc(self, value): self._bcc = self._get_recipients(value) @property def subject(self): """The subject of the message""" return self._subject @subject.setter def subject(self, value): self._subject = value.decode('utf-8') @property def body(self): """The body of the message""" return self._body @body.setter def body(self, value): self._body = value.decode('utf-8') @property def content(self): """The type of content of the message""" return self._content @content.setter def content(self, value): if value.lower() not in ['text', 'html']: raise ValueError(('Invalid content type :{content}.' 'Allowed ["text", "html"]').format(content=value)) self._content = value.lower() @property def attachments(self): """A list of attachment names of the message""" return self._attachments @attachments.setter def attachments(self, entries): values = [] self._parts = [] attachments = self._comma_delimited_to_list(entries) if entries else [] for attachment in attachments: part = MIMEApplication(open(os.path.expanduser(attachment), "rb").read()) part.add_header('Content-Disposition', 'attachment', filename=os.path.basename(attachment)) self._parts.append(part) values.append(attachment) self._attachments = values @property def as_string(self): """The string representation of the message""" return self._message.as_string() def __str__(self): """The string representation of the message""" return self.as_string