048-autorization-server¶
Description
Авторизационный сервис. Написан для СУПа, затем стал использоваться как основной авторизационный сервис для внутренних нужд.
python3.7
flask_jwt_extended
docker
kubernetes
перейти на страницу проекта в git
Зависит от сервисов:
example user record
{
"_id" : ObjectId("************************"),
"username" : "+791********",
"password" : "********",
"user" : "******** *********",
"user_claims" : {
"root" : false,
"domain1.oboi.ru" : {
"any" : "admin"
},
"domain2.oboi.ru" : {
"372" : "seller",
"371" : "seller"
}
}
}
/token/login Получение токенов
/token/refresh Bearer token Обновление access токена
нет понимания зачем нужна доп проверка access токена
def checkAccess(func):
def wrapper(*args, **kwargs):
access_token = request.headers.get('access_token')
if not access_token:
return func(*args, **kwargs)
try:
access_token = jwt.decode(access_token, E('ENV_JWT_SECRET_KEY'),
algorithms='HS256')
except jwt.exceptions.ExpiredSignatureError:
return func(*args, **kwargs)
time = (datetime.timestamp(datetime.now())
- access_token.get('iat',
datetime.timestamp(datetime.now())))
if time > 5:
return func(*args, **kwargs)
return json.dumps({
'response': 'between calls <= 5s'},
default=str), 429
return wrapper
/sms/restore_pass Запрос на восстановление пароля
/sms/change_pass Изменение пароля через СМС код
/user/change_pass Bearer token Изменение пароля
/user/claims Bearer token Получение claims
/user/recovery
???
BUG
/admin/add_shop
ROOT Bearer token Создание или обновление любых полей записи. НЕБЕЗОПАСНО
Quote
предпочтительно выпилить
/admin/add_upd_user
Bearer token Мясо - нет комментариев
BUG
Нужно переписать - донтридабельно
@app.route('/admin/add_upd_user', methods=['POST'], endpoint="user")
@jwt_required
def add_upd_user():
domain_origin = request.headers.get("origin", '')
if demo_key in domain_origin:
return json.dumps({'response': 'ошибка доступа'}, default=str), 403
data = read_data(request)
if not data.get('username') and (not data.get('claims') or not data.get('update')):
return json.dumps({'response': 'Bad Request'}, default=str), 400
author = User(get_jwt_identity(), db_data=False, claims=get_jwt_claims())
# print(author.__dict__)
user = User(data.get('username'))
if data.get('update'):
if (not author.check_domain(domain_origin, True)
or not user.check_domain(domain_origin)):
return json.dumps({'response': 'ошибка доступа'}, default=str), 403
user.update_user_data(name=data.get('update').get('username'),
name_full=data.get('update').get('name'))
if data.get('claims'):
if (hasattr(user, 'password') and user.password) or (
hasattr(user, 'claims') and user.claims):
user.update(author, data.get('claims'))
else:
user.create(author, data)
return json.dumps({'response': user.claims}, default=str), 200
ссылки на метод в СУПе