Files
Projet12/tools.py

406 lines
13 KiB
Python

from menu import return_menu
from models import Collaborator, Customer, Contract, Event, Credentials
from authentication import PasswordTools
from views import View
from sqlalchemy.orm import Session
from sqlalchemy import select, update, insert, delete
from datetime import datetime, date
class Tools:
"""
Generic tools with object as an arg
"""
def __init__(self, db: Session):
self.db = db
self.view = View()
def list(self, object):
"""
Select all objects from DB and return the list
:param object: object in Collaborator, Customer, Contract, Event
:return: list of object
"""
while self.db:
return self.db.execute(select(object)).all()
def filter(self, object, filter):
"""
Select objects from DB where filter and return the list
:param object: object in Collaborator, Customer, Contract, Event
:param filter: (attribute, value):tuple
:return: list of selected object matching filter
"""
item, value = filter
stmt = (select(object).where(**{item: value}))
while self.db:
result = self.db.execute(
select(object).where(**{item: value})).all()
if not result:
self.view.display_error()
self.view.display_results(result)
class CollaboratorTools:
"""
Interface to manage Collaborator object in DB
"""
def __init__(self, db: Session):
self.db = db
self.view = View()
self.pw_tools = PasswordTools(self.db)
def get_id_by_name(self, username):
collaborator = self.db.execute(
select(Collaborator).where(Collaborator.name == username)).scalar()
if not collaborator:
return None
return collaborator.id
def get_by_team_id(self, team_id):
collaborator = self.db.execute(
select(Collaborator).where(Collaborator.team_id == team_id)).all()
return collaborator
def list(self):
result = self.db.execute(select(Collaborator)).all()
self.view.display_results(result)
def create(self) -> None:
"""
Create a new collaborator with minimum information
:return: None; creates object in DB
"""
pwd_tools = PasswordTools(self.db)
collaborator = self.view.prompt_for_collaborator()
print(collaborator)
new_collab = Collaborator(
name=collaborator['name'],
email=collaborator['email'],
phone=collaborator['phone'],
team_id=collaborator['team_id']
)
self.db.add(new_collab)
self.db.commit()
new_collab_pwd = Credentials(collaborator_id=new_collab.id,
password_hash=pwd_tools.hash("123456"))
self.db.add(new_collab_pwd)
self.db.commit()
self.view.display_confirm("collaborator", new_collab.id)
def update(self, my_id) -> None:
"""
Update a collaborator; asks for field to change and value
:param my_id: id of the collaborator to modify
:return: None; Update entry in DB
"""
pwd_tools = PasswordTools(self.db)
collab = self.db.get(Collaborator, my_id)
item, value = self.view.prompt_for_collaborator_update()
stmt = (update(Collaborator).where(Collaborator.id == my_id).values(
**{item: value}))
if item == 'password':
if not self.db.execute(select(Credentials).where(
Credentials.collaborator_id == my_id)).all():
stmt = (insert(Credentials).values(
collaborator_id=my_id,
password_hash=pwd_tools.hash(value)))
else:
stmt = (update(Credentials).where(
Credentials.collaborator_id == my_id).values(
password_hash=pwd_tools.hash(value)))
self.db.execute(stmt)
self.db.commit()
self.view.display_change(collab.name, item, value)
def delete(self, my_id) -> None:
"""
Delete a collaborator
:param my_id: id of the collaborator to delete
:return: None; deletes entry in DB
"""
stmt = delete(Collaborator).where(Collaborator.id == my_id)
self.db.execute(stmt)
self.db.commit()
def get_team_by_name(self, username: str) -> int | None:
"""
Check if username exists, returns team_id to check permission further
:param username: string, Collaborator.username
:return: None|team_id:int
"""
ret = self.db.execute(
select(Collaborator).where(Collaborator.name == username)).scalar()
if ret is None:
self.view.display_error()
return ret
else:
return ret.team_id
def list_by_team_id(self, my_id):
pass
def list_by_contract_id(self, my_id):
pass
def list_by_event_id(self, my_id):
pass
class CustomerTools:
"""
Interface to manage Customer object in DB
"""
def __init__(self, db: Session):
self.db = db
self.view = View()
def list(self):
result = self.db.execute(select(Customer)).all()
self.view.display_results(result)
return result
def create(self, user_id) -> None:
"""
Create a new customer with minimum information
:return: None; creates object in DB
"""
customer = self.view.prompt_for_customer()
new_customer = Customer(
name=customer['name'],
email=customer['email'],
phone=customer['phone'],
company=customer['company'],
commercial_id=user_id,
)
self.db.add(new_customer)
self.db.commit()
self.view.display_confirm("customer", new_customer.id)
def update(self, my_id):
"""
Update a customer; asks for field to change and value
:param my_id: id of the collaborator to modify
:return: None; Update entry in DB
"""
cust = self.db.get(Customer, my_id)
item, value = self.view.prompt_for_customer_update()
stmt = (update(Customer).where(Customer.id == my_id).values(
**{item: value}, last_update=datetime.now()))
self.db.execute(stmt)
self.db.commit()
self.view.display_change(cust.name, item, value)
def delete(self, my_id) -> None:
"""
Delete a customer
:param my_id: id of the customer to delete
:return: None; deletes entry in DB
"""
stmt = delete(Customer).where(Customer.id == my_id)
self.db.execute(stmt)
self.db.commit()
def get_by_commercial_id(self, my_id):
"""
Check if commercial exists, then returns a list
:param my_id: a commercial id
:return: a list of Customer
"""
if self.db.get(Collaborator, my_id):
ret = self.db.execute(
select(Customer).where(Customer.commercial_id == my_id)).all()
if ret is None:
# print({'message': "No customer found"})
self.view.display_error()
return ret
else:
# print({'message': "No commercial with this id"})
self.view.display_error()
return None
class ContractTools:
"""
Interface to manage Contract object in DB
"""
def __init__(self, db: Session):
self.db = db
self.view = View()
def list(self):
"""
List all contracts from DB
:return: a list of Contract objects
"""
result = self.db.execute(select(Contract)).all()
self.view.display_results(result)
def signed(self):
"""
List only Contract where signed is True (or 1)
:return: a list of Contract objects
"""
result = self.db.execute(
select(Contract).where(Contract.signed == 1))
self.view.display_results(result)
def not_signed(self):
"""
List only Contract where signed is False (or 0)
:return: a list of Contract objects
"""
result = self.db.execute(
select(Contract).where(Contract.signed == 0))
self.view.display_results(result)
def create(self,
customer_options,
commercial_options) -> None:
"""
Create a new contracts with minimum information
:return: None; creates object in DB
"""
contract = self.view.prompt_for_contract(customer_options,
commercial_options)
if not contract['amount']:
contract['amount'] = 0
new_contract = Contract(
customer_id=contract['customer'],
commercial_id=contract['commercial'],
amount=contract['amount'],
)
self.db.add(new_contract)
self.db.commit()
self.view.display_confirm("contract", new_contract.id)
def update(self, my_id,
customer_options,
commercial_options,
event_options):
"""
Update a contract; asks for field to change and value
:param my_id: id of the contract to modify
:return: None; Update entry in DB
"""
item, value = self.view.prompt_for_contract_update()
if item == 'signed':
if value.lower() == "yes":
value = 1
else:
value = 0
# stmt = (update(Contract).where(Contract.id == my_id).values(
# signed=is_signed))
if item == 'customer_id':
value = return_menu(customer_options)
if item == 'commercial_id':
value = return_menu(commercial_options)
if item == 'event_id':
value = return_menu(event_options)
stmt = (update(Contract).where(Contract.id == my_id).values(
**{item: value}))
self.db.execute(stmt)
self.db.commit()
self.view.display_change("Contract"+str(my_id), item, value)
def delete(self, my_id) -> None:
"""
Delete a contract
:param my_id: id of the contract to delete
:return: None; deletes entry in DB
"""
stmt = delete(Contract).where(Contract.id == my_id)
self.db.execute(stmt)
self.db.commit()
class EventTools:
"""
Interface to manage Event object in DB
"""
def __init__(self, db: Session):
self.db = db
self.view = View()
def list(self):
"""
List all events from DB
:return: list of Event objects
"""
result = self.db.execute(select(Event)).all()
self.view.display_results(result)
def filter(self, field, value):
"""
Retrieve only event with specified field
:param field: Event.field
:return: display a list of Event
"""
result = self.db.execute(
select(Event).filter_by(**{field: value})).all()
if not result:
self.view.display_error()
else:
self.view.display_results(result)
def filter_owned(self, user_id):
"""
List only events where support_id is the id of the user logged in
:param user_id: the connected user id
:return: list of Event objects
"""
result = self.db.execute(
select(Event).where(Event.support_id == user_id)).all()
if not result:
self.view.display_error()
return None
self.view.display_results(result)
return result
def create(self) -> None:
"""
Create a new event with minimum information
:return: None; creates object in DB
"""
event = self.view.prompt_for_event()
new_event = Event(
name=event['name'],
contract_id=event['contract_id'],
customer_id=event['customer_id'],
customer_contact=event['customer_contact'],
date_start=event['date_start'],
date_end=event['date_end'],
location=event['location'],
)
self.db.add(new_event)
self.db.commit()
self.view.display_confirm("event", new_event.id)
def update(self, my_id, support_options):
"""
Update an event in DB; asks for field to change and value
:param my_id: id of the event to modify
:return: None; Update entry in DB
"""
event = self.db.get(Event, my_id)
item, value = self.view.prompt_for_event_update(support_options)
if item == 'support_id':
value = return_menu(support_options)
stmt = (update(Event).where(Event.id == my_id).values(
**{item: value}))
self.db.execute(stmt)
self.db.commit()
self.view.display_change(event.name, item, value)
def delete(self, my_id) -> None:
"""
Delete an event
:param my_id: id of the event to delete
:return: None; deletes entry in DB
"""
stmt = delete(Event).where(Event.id == my_id)
self.db.execute(stmt)
self.db.commit()