from flask_login import login_user, logout_user from flask import Blueprint, render_template, redirect, url_for, request, flash, current_app from werkzeug.security import generate_password_hash, check_password_hash from flask_login import login_required, current_user, login_manager from .models import User, Conversation from . import db import time import hashlib auth = Blueprint('auth', __name__) def hash_to_digit(instr: str) -> str: outstr = hashlib.md5(instr.encode('utf-8')).hexdigest() outstr = f"{int(outstr, 16) % 1000000:0>6d}" return outstr def gen_activation_code(email: str) -> str: current_time_slot = int(time.time() // 60) s = email + current_app.config['SECRET_KEY'] + str(current_time_slot) activation_code = hash_to_digit(s) return activation_code def check_activation_code(email: str, activation_code: str) -> bool: current_time_slot = int(time.time() // 60) for time_slot in range(current_time_slot-10, current_time_slot+1): s = email + current_app.config['SECRET_KEY'] + str(time_slot) expected_code = hash_to_digit(s) if expected_code == activation_code: return True return False @auth.route('/login') def login(): return render_template('login.html') @auth.route('/login', methods=['POST']) def login_post(): # login code goes here email = request.form.get('email') password = request.form.get('password') remember = True if request.form.get('remember') else False user = User.query.filter_by(email=email).first() # check if the user actually exists # take the user-supplied password, hash it, and compare it to the hashed password in the database if not user or not check_password_hash(user.password, password): flash('请检查登录信息') # if the user doesn't exist or password is wrong, reload the page return redirect(url_for('auth.login')) # if the above check passes, then we know the user has the right credentials login_user(user, remember=remember) return redirect(url_for('main.index')) @auth.route('/activate', methods=['POST']) @login_required def activate(): activation_code = request.form.get('activation_code') if check_activation_code(current_user.email, activation_code): account = User.query.filter_by( id=current_user.id, email=current_user.email, name=current_user.name).first() if account: if db.session.query(User).filter(User.id == account.id).update({"isActivated": True}) and not db.session.commit(): time.sleep(0.05) return redirect(url_for('main.index')) time.sleep(1) flash("激活码不匹配") return redirect(url_for('main.index')) @auth.route('/signup') def signup(): return render_template('signup.html') @auth.route('/signup', methods=['POST']) def signup_post(): # code to validate and add user to database goes here email = request.form.get('email') name = request.form.get('name') password = request.form.get('password') # if this returns a user, then the email already exists in database user = User.query.filter_by(email=email).first() if user: # if a user is found, we want to redirect back to signup page so user can try again flash('此邮箱已注册!') return redirect(url_for('auth.signup')) if not (email): flash('请输入邮箱!') return redirect(url_for('auth.signup')) if not (name): name = email if not (password): flash('请输入密码') return redirect(url_for('auth.signup')) # create a new user with the form data. Hash the password so the plaintext version isn't saved. new_user = User(email=email, name=name, password=generate_password_hash(password, method='sha256'), role='user', isActivated=False) # first user is always admin if not User.query.count(): new_user.role = 'admin' new_user.isActivated = True # add the new user to the database db.session.add(new_user) db.session.commit() activation_code = gen_activation_code(new_user.email) from . import smtp content = f"Hi {new_user.name},欢迎注册!\n" content += f"您的激活码是:\n{activation_code}\n\n" content += "此激活码十分钟内有效,过期请联系管理员激活,谢谢!" smtp.sendmail(new_user.email, "web-gpt激活码", content) return redirect(url_for('auth.login')) @auth.route('/logout') def logout(): if current_user.is_authenticated: logout_user() return redirect(url_for('main.index')) @auth.route('/manage', methods=['POST']) @login_required def manage_post(): method = request.form.get('method') id = request.form.get('id') email = request.form.get('email') name = request.form.get('name') role = request.form.get('role') isActivated = True if request.form.get( 'isActivated') == "true" else False if current_user.role == "admin": if method == "update": account = User.query.filter_by( id=id, email=email, name=name).first() if account: if current_user.id != 1 and account.role == "admin" and role == "user": return "fail 您无权移除管理员!" if current_user.id != 1 and account.isActivated and not isActivated: return "fail 您无权禁用管理员!" if db.session.query(User).filter(User.id == id).update({"role": role, "isActivated": isActivated}) and not db.session.commit(): time.sleep(0.05) return "success" else: time.sleep(0.05) return "fail db_commit" time.sleep(1) return "fail no account" if method == "delete": if role == "admin" and id != current_user.id: return "fail 无法直接删除管理员" account = User.query.filter_by( id=id, email=email, name=name, role=role, isActivated=isActivated).first() if account: db.session.query(Conversation).filter( Conversation.userid == id).delete() db.session.commit() if db.session.query(User).filter(User.id == id).delete() and not db.session.commit(): time.sleep(0.05) return "success" else: time.sleep(0.2) return "fail db_commit" time.sleep(1) return "fail no account" if current_user.id == id and current_user.role == "user": flash("暂时无法更改信息") return redirect(url_for('main.index')) flash("您无权管理其他账户") return redirect(url_for('main.index'))