dusty/reporters/email/helper.py (119 lines of code) (raw):

#!/usr/bin/python3 # coding=utf-8 # Copyright 2019 getcarrier.io # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Email helper """ import ssl import smtplib import traceback from email import encoders from email.mime.base import MIMEBase from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from dusty.tools import log from dusty.models.error import Error class EmailHelper: """ Helps to send email """ def __init__(self, context, server, login, password, port=587, timeout=30): # pylint: disable=R0913 self.context = context self.server = server self.port = port self.timeout = timeout self.login = login self.password = password self.connection = None def connect(self): """ Establish connection to SMTP server """ try: self.connection = smtplib.SMTP(self.server, self.port, timeout=self.timeout) self.connection.ehlo() self.connection.starttls(context=ssl.create_default_context()) self.connection.ehlo() self.connection.login(self.login, self.password) except ssl.SSLError: log.warning("SSL error, retrying with unverified SSL context") self.connection = smtplib.SMTP(self.server, self.port, timeout=self.timeout) self.connection.ehlo() self.connection.starttls(context=ssl._create_unverified_context()) # pylint: disable=W0212 self.connection.ehlo() self.connection.login(self.login, self.password) except: # pylint: disable=W0702 log.exception("Failed to connect to SMTP server") error = Error( tool="EMail", error="Failed to connect to SMTP server", details=f"```\n{traceback.format_exc()}\n```" ) self.context.errors.append(error) if self.connection: self.connection.quit() def send(self, mail_to, subject, html_body="", attachments=None): """ Send mail """ message = MIMEMultipart("alternative") message["From"] = self.login message["To"] = ", ".join(mail_to) message["Subject"] = subject message.attach(MIMEText(html_body, "html")) if attachments: if isinstance(attachments, str): attachments = [attachments] for item in attachments: if isinstance(item, tuple): filepath, filename = item else: filepath = item filename = item.split('/')[-1] with open(filepath, "rb") as file: part = MIMEBase("application", "octet-stream") part.set_payload(file.read()) encoders.encode_base64(part) part.add_header( "Content-Disposition", f"attachment; filename= {filename}" ) message.attach(part) try: self.connect() self.connection.sendmail(message["From"], mail_to, message.as_string()) except: # pylint: disable=W0702 log.exception("Failed to send email") error = Error( tool="EMail", error="Failed to send email", details=f"```\n{traceback.format_exc()}\n```" ) self.context.errors.append(error) finally: if self.connection: self.connection.quit() def send_with_cc(self, mail_to, mail_cc, subject, html_body="", attachments=None): # pylint: disable=R0913 """ Send mail """ message = MIMEMultipart("alternative") message["From"] = self.login message["To"] = ", ".join(mail_to) message["Cc"] = ", ".join(mail_cc) message["Subject"] = subject message.attach(MIMEText(html_body, "html")) if attachments: if isinstance(attachments, str): attachments = [attachments] for item in attachments: if isinstance(item, tuple): filepath, filename = item else: filepath = item filename = item.split('/')[-1] with open(filepath, "rb") as file: part = MIMEBase("application", "octet-stream") part.set_payload(file.read()) encoders.encode_base64(part) part.add_header( "Content-Disposition", f"attachment; filename= {filename}" ) message.attach(part) try: self.connect() self.connection.sendmail(message["From"], mail_to + mail_cc, message.as_string()) except: # pylint: disable=W0702 log.exception("Failed to send email") error = Error( tool="EMail", error="Failed to send email", details=f"```\n{traceback.format_exc()}\n```" ) self.context.errors.append(error) finally: if self.connection: self.connection.quit()