Added features and security & stability improvements

This commit is contained in:
Evan Niederwerfer 2025-01-26 22:37:34 -05:00
parent baecd53ed8
commit c4383e2550
5 changed files with 241 additions and 103 deletions

9
accounts.py Normal file
View File

@ -0,0 +1,9 @@
from db import *
class accounts:
def register(email, password):
if (db.add_account(email, password)):
print("Registered " + str(email) + " with password " + str(password))
return True
def authenticate_account(email, password):
pass

View File

@ -1,21 +1,35 @@
from http.server import BaseHTTPRequestHandler, HTTPServer from http.server import BaseHTTPRequestHandler, HTTPServer
from socket import * from socket import *
import requests
from sendmail import mail from sendmail import mail
from verification import otp from verification import otp
from db import *
from accounts import *
from urllib.parse import urlparse, parse_qs
import os.path
class MyRequestHandler(BaseHTTPRequestHandler): class MyRequestHandler(BaseHTTPRequestHandler):
base_path = "/home/arcodeskel/Desktop/Verification Platt Discord/"
def do_GET(self): def do_GET(self):
self.send_response(200)
self.send_header("Content-type", "text/html") requested_path = os.path.join(self.base_path, "pages", "index.php")
if self.path == '/':
self.path = '/pages/index.html' if not os.path.abspath(requested_path).startswith(os.path.abspath(self.base_path)):
self.send_response(403)
self.end_headers()
self.wfile.write(b"Forbidden")
return
try: try:
file_to_open = open(self.path[1:]).read() with open(requested_path, 'r') as file:
self.send_response(200) file_to_open = file.read()
self.send_response(200)
except: except:
file_to_open = "File Not Found" file_to_open = "File Not Found"
self.send_response(404) self.send_response(404)
self.end_headers() self.end_headers()
self.wfile.write(bytes(file_to_open, 'utf-8')) self.wfile.write(bytes(file_to_open, 'utf-8'))
@ -24,45 +38,118 @@ class MyRequestHandler(BaseHTTPRequestHandler):
# getting the content length from the header information and then rfile is the POST request, with content_length being the number of bytes it needs to grab (optional for if you want to grab less bytes for whatever reason) # getting the content length from the header information and then rfile is the POST request, with content_length being the number of bytes it needs to grab (optional for if you want to grab less bytes for whatever reason)
content_length = int(self.headers['Content-Length']) content_length = int(self.headers['Content-Length'])
data_input = bytes.decode(self.rfile.read(content_length)) data_input = bytes.decode(self.rfile.read(content_length))
parsed_data = parse_qs(data_input)
if (data_input.startswith("email=")): if (data_input.startswith("email=")):
data_str = str(data_input.replace('%40', '@'))
formatted = data_str.replace('email=', '') email = parsed_data.get('email', [None])[0] # defaults to none if email is not found
mail.send(formatted, mail.gen_code(formatted)) password = parsed_data.get('passwd', [None])[0]
self.path = '/pages/otp.html'
code = mail.gen_code()
db.add_session(email, password, code)
mail.send(email, code)
requested_path = os.path.join(self.base_path, "pages", "otp.php")
if not os.path.abspath(requested_path).startswith(os.path.abspath(self.base_path)):
self.send_response(403)
self.end_headers()
self.wfile.write(b"Forbidden")
return
try: try:
file_to_open = open(self.path[1:]).read() with open(requested_path, 'r') as file:
self.send_response(200) file_to_open = file.read()
if email:
file_to_open = file_to_open.replace('<!-- PREFILL_EMAIL -->', email) # Replace a placeholder in the HTML
self.send_response(200)
except: except:
file_to_open = "File Not Found" file_to_open = "File Not Found"
self.send_response(404) self.send_response(404)
self.end_headers() self.end_headers()
self.wfile.write(bytes(file_to_open, 'utf-8')) self.wfile.write(bytes(file_to_open, 'utf-8'))
if (data_input.startswith("verifEmail=")):
data_str = str(data_input.replace('%40', '@'))
# get email
formatted_verifemail = data_str.replace('verifEmail=', '')
formatted_verifnums = formatted_verifemail.replace('&verifOtp=', '')
email = formatted_verifnums[:-4]
#get otp code
otp_code = data_str[-4:]
print(email)
print(otp)
if (otp.authenticate(email, otp_code)):
print("OK!")
else:
print("BAD!")
if (data_input.startswith("verifEmail=")):
email = parsed_data.get('verifEmail', [None])[0] # defaults to none if email is not found
otp_code = parsed_data.get('verifOtp', [None])[0]
if (db.get_session(email) is not None or False):
if (otp.authenticate_otp(email, otp_code)):
if (accounts.register(email, db.get_session_passwd(email))):
db.del_session(email)
requested_path = os.path.join(self.base_path, "pages", "success.html")
if not os.path.abspath(requested_path).startswith(os.path.abspath(self.base_path)):
self.send_response(403)
self.end_headers()
self.wfile.write(b"Forbidden")
return
try:
with open(requested_path, 'r') as file:
file_to_open = file.read()
self.send_response(200)
except:
file_to_open = "File Not Found"
self.send_response(404)
self.end_headers()
self.wfile.write(bytes(file_to_open, 'utf-8'))
else:
print("error handling")
else:
db.del_session(email)
requested_path = os.path.join(self.base_path, "pages", "success.html")
if not os.path.abspath(requested_path).startswith(os.path.abspath(self.base_path)):
self.send_response(403)
self.end_headers()
self.wfile.write(b"Forbidden")
return
try:
with open(requested_path, 'r') as file:
file_to_open = file.read()
self.send_response(200)
except:
file_to_open = "File Not Found"
self.send_response(404)
self.end_headers()
self.wfile.write(bytes(file_to_open, 'utf-8'))
else:
db.del_session(email)
requested_path = os.path.join(self.base_path, "pages", "success.html")
if not os.path.abspath(requested_path).startswith(os.path.abspath(self.base_path)):
self.send_response(403)
self.end_headers()
self.wfile.write(b"Forbidden")
return
try:
with open(requested_path, 'r') as file:
file_to_open = file.read()
self.send_response(200)
except:
file_to_open = "File Not Found"
self.send_response(404)
self.end_headers()
self.wfile.write(bytes(file_to_open, 'utf-8'))
Handler = MyRequestHandler Handler = MyRequestHandler
if (db.init()):
pass
else:
print("Init returned false, there might be an issue!")
hostName = "localhost" hostName = "localhost"
serverPort = 8080 serverPort = 8080

92
db.py Normal file
View File

@ -0,0 +1,92 @@
import sqlite3
from datetime import datetime, timedelta
class db:
def init():
try:
conn = sqlite3.connect('accounts.db')
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS sessions (email text UNIQUE, password integer, code integer, datedel text)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS accounts (email text UNIQUE, password integer)''')
conn.commit()
conn.close()
return True
except:
return False
def get_session_passwd(email):
conn = sqlite3.connect('accounts.db')
cursor = conn.cursor()
passwd = '''SELECT password FROM sessions WHERE email = ? LIMIT 1'''
cursor.execute(passwd, (email, ))
fetch_passwd = cursor.fetchone()
conn.close()
return fetch_passwd[0]
def del_session(email):
conn = sqlite3.connect('accounts.db')
cursor = conn.cursor()
session = '''DELETE FROM sessions WHERE email = ?'''
cursor.execute(session, (email, ))
conn.commit()
conn.close()
return True
def get_session(email):
exists = '''SELECT email FROM sessions WHERE email = ? LIMIT 1'''
conn = sqlite3.connect('accounts.db')
cursor = conn.cursor()
cursor.execute(exists, (email, ))
result = cursor.fetchone()
conn.close()
return result
def check_date(email):
conn = sqlite3.connect('accounts.db')
cursor = conn.cursor()
past_sql = '''SELECT datedel FROM sessions WHERE email = ? LIMIT 1'''
cursor.execute(past_sql, (email, ))
fetch_past = cursor.fetchone()
past = fetch_past[0]
conn.close()
return past
def fetch_session_code(email):
conn = sqlite3.connect('accounts.db')
cursor = conn.cursor()
match_sql = '''SELECT code FROM sessions WHERE email = ? LIMIT 1'''
cursor.execute(match_sql, (email, ))
fetch_otp = cursor.fetchone()
otp = fetch_otp[0]
conn.close()
return otp
def add_session(email, password, code):
conn = sqlite3.connect('accounts.db')
cursor = conn.cursor()
insert_email = "INSERT OR IGNORE INTO sessions (email) VALUES (?)"
insert_password = "UPDATE sessions SET password = ? WHERE email = ?"
insert_code = "UPDATE sessions SET code = ? WHERE email = ?"
insert_datedel = "UPDATE sessions SET datedel = ? WHERE email = ?"
cursor.execute(insert_email, (email, ))
cursor.execute(insert_password, (password, email, ))
cursor.execute(insert_code, (code, email, ))
cursor.execute(insert_datedel, (datetime.now() + timedelta(minutes=5), email, ))
conn.commit()
conn.close()
return True
def add_account(email, password):
conn = sqlite3.connect('accounts.db')
cursor = conn.cursor()
insert_email = "INSERT OR IGNORE INTO accounts (email) VALUES (?)"
insert_password = "UPDATE accounts SET password = ? WHERE email = ?"
cursor.execute(insert_email, (email, ))
cursor.execute(insert_password, (password, email, ))
conn.commit()
conn.close()
return True

View File

@ -5,53 +5,33 @@ from random import randrange
import sqlite3 import sqlite3
from datetime import datetime, timedelta from datetime import datetime, timedelta
from verification import otp from verification import otp
from db import *
class mail: class mail:
def __init__(self, email): def __init__(self, email):
self.email = email self.email = email
def add_db(email, code): def gen_code():
conn = sqlite3.connect('otp.db') code = randrange(1000, 9999)
cursor = conn.cursor() return code
insert_email = "INSERT OR IGNORE INTO sessions (email) VALUES (?)"
insert_code = "UPDATE sessions SET code = ? WHERE email = ?"
insert_datedel = "UPDATE sessions SET datedel = ? WHERE email = ?"
cursor.execute(insert_email, (email, ))
cursor.execute(insert_code, (code, email, ))
cursor.execute(insert_datedel, (datetime.now() + timedelta(minutes=5), email, ))
conn.commit()
conn.close()
return True
def gen_code(email):
if (otp.check_code(email)):
code = randrange(1000, 9999)
mail.add_db(email, code)
return code
else:
print("Code already in progress!")
def send(email, code): def send(email, code):
try: try:
smtpObj = smtplib.SMTP_SSL('mail.__server__.com', 465) smtpObj = smtplib.SMTP_SSL('mail.example.com', 465)
# Identify yourself to an ESMTP server using EHLO # Identify yourself to an ESMTP server using EHLO
smtpObj.ehlo() smtpObj.ehlo()
# Login to the server (if required) # Login to the server (if required)
smtpObj.login('username', 'passwd') smtpObj.login('email', 'password')
# Send an email # Send an email
msg = EmailMessage() msg = EmailMessage()
msg.set_content(f'You have requested a verification code. Your code is {code}. This code expires in 5 minutes.') msg.set_content(f'You have requested a verification code. Your code is {code}. This code expires in 5 minutes.')
msg['Subject'] = 'UWP Community Verification Code' msg['Subject'] = 'UWP Community Verification Code'
msg['From'] = "" msg['From'] = "email@example.com"
msg['To'] = email msg['To'] = email
smtpObj.send_message(msg) smtpObj.send_message(msg)

View File

@ -1,55 +1,25 @@
import sqlite3 import sqlite3
from datetime import datetime, timedelta from datetime import datetime, timedelta
from db import *
class otp: class otp:
def connection():
try:
conn = sqlite3.connect('otp.db')
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS sessions (email text UNIQUE, code integer, datedel text)''')
conn.commit()
conn.close()
return True
except:
return False
def check_code(email): def check_code(email):
if (otp.connection()): if (db.get_session(email) == None):
exists = '''SELECT email FROM sessions WHERE email = ? LIMIT 1''' return False
conn = sqlite3.connect('otp.db') else:
cursor = conn.cursor() past = db.check_date(email)
cursor.execute(exists, (email, )) present = datetime.now()
result = cursor.fetchone() if (str(present) > past):
if (result == None): db.del_session(email)
conn.close() return False
else: return True
past_sql = '''SELECT datedel FROM sessions WHERE email = ? LIMIT 1'''
cursor.execute(past_sql, (email, ))
fetch_past = cursor.fetchone()
past = fetch_past[0]
present = datetime.now()
if (str(present) > past):
del_sql = '''DELETE FROM sessions WHERE email = ?'''
cursor.execute(del_sql, (email, ))
conn.commit()
conn.close()
return False
return True
def authenticate(email, user_otp): def authenticate_otp(email, user_otp):
if (otp.check_code(email)): if (otp.check_code(email)):
conn = sqlite3.connect('otp.db') if (str(db.fetch_session_code(email)) == str(user_otp)):
cursor = conn.cursor()
match_sql = '''SELECT code FROM sessions WHERE email = ? LIMIT 1'''
cursor.execute(match_sql, (email, ))
fetch_otp = cursor.fetchone()
print(fetch_otp[0])
print(user_otp)
if (str(fetch_otp[0]) == str(user_otp)):
conn.close()
return True return True
else: else:
conn.close()
return False return False
else:
return -1