406 lines
13 KiB
Python
406 lines
13 KiB
Python
from menu import return_menu
|
|
from models import Collaborator, Customer, Contract, Event, Credentials
|
|
from authentication import PasswordTools
|
|
from views import View
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import select, update, insert, delete
|
|
from datetime import datetime, date
|
|
|
|
|
|
|
|
class Tools:
|
|
"""
|
|
Generic tools with object as an arg
|
|
"""
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
self.view = View()
|
|
|
|
def list(self, object):
|
|
"""
|
|
Select all objects from DB and return the list
|
|
:param object: object in Collaborator, Customer, Contract, Event
|
|
:return: list of object
|
|
"""
|
|
while self.db:
|
|
return self.db.execute(select(object)).all()
|
|
|
|
def filter(self, object, filter):
|
|
"""
|
|
Select objects from DB where filter and return the list
|
|
:param object: object in Collaborator, Customer, Contract, Event
|
|
:param filter: (attribute, value):tuple
|
|
:return: list of selected object matching filter
|
|
"""
|
|
item, value = filter
|
|
stmt = (select(object).where(**{item: value}))
|
|
while self.db:
|
|
result = self.db.execute(
|
|
select(object).where(**{item: value})).all()
|
|
if not result:
|
|
self.view.display_error()
|
|
self.view.display_results(result)
|
|
|
|
|
|
class CollaboratorTools:
|
|
"""
|
|
Interface to manage Collaborator object in DB
|
|
"""
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
self.view = View()
|
|
self.pw_tools = PasswordTools(self.db)
|
|
|
|
def get_id_by_name(self, username):
|
|
collaborator = self.db.execute(
|
|
select(Collaborator).where(Collaborator.name == username)).scalar()
|
|
if not collaborator:
|
|
return None
|
|
return collaborator.id
|
|
|
|
def get_by_team_id(self, team_id):
|
|
collaborator = self.db.execute(
|
|
select(Collaborator).where(Collaborator.team_id == team_id)).all()
|
|
return collaborator
|
|
|
|
def list(self):
|
|
result = self.db.execute(select(Collaborator)).all()
|
|
self.view.display_results(result)
|
|
|
|
def create(self) -> None:
|
|
"""
|
|
Create a new collaborator with minimum information
|
|
:return: None; creates object in DB
|
|
"""
|
|
pwd_tools = PasswordTools(self.db)
|
|
collaborator = self.view.prompt_for_collaborator()
|
|
print(collaborator)
|
|
new_collab = Collaborator(
|
|
name=collaborator['name'],
|
|
email=collaborator['email'],
|
|
phone=collaborator['phone'],
|
|
team_id=collaborator['team_id']
|
|
)
|
|
self.db.add(new_collab)
|
|
self.db.commit()
|
|
new_collab_pwd = Credentials(collaborator_id=new_collab.id,
|
|
password_hash=pwd_tools.hash("123456"))
|
|
self.db.add(new_collab_pwd)
|
|
self.db.commit()
|
|
self.view.display_confirm("collaborator", new_collab.id)
|
|
|
|
def update(self, my_id) -> None:
|
|
"""
|
|
Update a collaborator; asks for field to change and value
|
|
:param my_id: id of the collaborator to modify
|
|
:return: None; Update entry in DB
|
|
"""
|
|
pwd_tools = PasswordTools(self.db)
|
|
collab = self.db.get(Collaborator, my_id)
|
|
item, value = self.view.prompt_for_collaborator_update()
|
|
stmt = (update(Collaborator).where(Collaborator.id == my_id).values(
|
|
**{item: value}))
|
|
if item == 'password':
|
|
if not self.db.execute(select(Credentials).where(
|
|
Credentials.collaborator_id == my_id)).all():
|
|
stmt = (insert(Credentials).values(
|
|
collaborator_id=my_id,
|
|
password_hash=pwd_tools.hash(value)))
|
|
else:
|
|
stmt = (update(Credentials).where(
|
|
Credentials.collaborator_id == my_id).values(
|
|
password_hash=pwd_tools.hash(value)))
|
|
self.db.execute(stmt)
|
|
self.db.commit()
|
|
self.view.display_change(collab.name, item, value)
|
|
|
|
def delete(self, my_id) -> None:
|
|
"""
|
|
Delete a collaborator
|
|
:param my_id: id of the collaborator to delete
|
|
:return: None; deletes entry in DB
|
|
"""
|
|
stmt = delete(Collaborator).where(Collaborator.id == my_id)
|
|
self.db.execute(stmt)
|
|
self.db.commit()
|
|
|
|
def get_team_by_name(self, username: str) -> int | None:
|
|
"""
|
|
Check if username exists, returns team_id to check permission further
|
|
:param username: string, Collaborator.username
|
|
:return: None|team_id:int
|
|
"""
|
|
ret = self.db.execute(
|
|
select(Collaborator).where(Collaborator.name == username)).scalar()
|
|
if ret is None:
|
|
self.view.display_error()
|
|
return ret
|
|
else:
|
|
return ret.team_id
|
|
|
|
def list_by_team_id(self, my_id):
|
|
pass
|
|
|
|
def list_by_contract_id(self, my_id):
|
|
pass
|
|
|
|
def list_by_event_id(self, my_id):
|
|
pass
|
|
|
|
|
|
class CustomerTools:
|
|
"""
|
|
Interface to manage Customer object in DB
|
|
"""
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
self.view = View()
|
|
|
|
def list(self):
|
|
result = self.db.execute(select(Customer)).all()
|
|
self.view.display_results(result)
|
|
return result
|
|
|
|
def create(self, user_id) -> None:
|
|
"""
|
|
Create a new customer with minimum information
|
|
:return: None; creates object in DB
|
|
"""
|
|
customer = self.view.prompt_for_customer()
|
|
new_customer = Customer(
|
|
name=customer['name'],
|
|
email=customer['email'],
|
|
phone=customer['phone'],
|
|
company=customer['company'],
|
|
commercial_id=user_id,
|
|
)
|
|
self.db.add(new_customer)
|
|
self.db.commit()
|
|
self.view.display_confirm("customer", new_customer.id)
|
|
|
|
def update(self, my_id):
|
|
"""
|
|
Update a customer; asks for field to change and value
|
|
:param my_id: id of the collaborator to modify
|
|
:return: None; Update entry in DB
|
|
"""
|
|
cust = self.db.get(Customer, my_id)
|
|
item, value = self.view.prompt_for_customer_update()
|
|
stmt = (update(Customer).where(Customer.id == my_id).values(
|
|
**{item: value}, last_update=datetime.now()))
|
|
self.db.execute(stmt)
|
|
self.db.commit()
|
|
self.view.display_change(cust.name, item, value)
|
|
|
|
|
|
def delete(self, my_id) -> None:
|
|
"""
|
|
Delete a customer
|
|
:param my_id: id of the customer to delete
|
|
:return: None; deletes entry in DB
|
|
"""
|
|
stmt = delete(Customer).where(Customer.id == my_id)
|
|
self.db.execute(stmt)
|
|
self.db.commit()
|
|
|
|
def get_by_commercial_id(self, my_id):
|
|
"""
|
|
Check if commercial exists, then returns a list
|
|
:param my_id: a commercial id
|
|
:return: a list of Customer
|
|
"""
|
|
if self.db.get(Collaborator, my_id):
|
|
ret = self.db.execute(
|
|
select(Customer).where(Customer.commercial_id == my_id)).all()
|
|
if ret is None:
|
|
# print({'message': "No customer found"})
|
|
self.view.display_error()
|
|
return ret
|
|
else:
|
|
# print({'message': "No commercial with this id"})
|
|
self.view.display_error()
|
|
return None
|
|
|
|
|
|
class ContractTools:
|
|
"""
|
|
Interface to manage Contract object in DB
|
|
"""
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
self.view = View()
|
|
|
|
def list(self):
|
|
"""
|
|
List all contracts from DB
|
|
:return: a list of Contract objects
|
|
"""
|
|
result = self.db.execute(select(Contract)).all()
|
|
self.view.display_results(result)
|
|
|
|
def signed(self):
|
|
"""
|
|
List only Contract where signed is True (or 1)
|
|
:return: a list of Contract objects
|
|
"""
|
|
result = self.db.execute(
|
|
select(Contract).where(Contract.signed == 1))
|
|
self.view.display_results(result)
|
|
|
|
def not_signed(self):
|
|
"""
|
|
List only Contract where signed is False (or 0)
|
|
:return: a list of Contract objects
|
|
"""
|
|
result = self.db.execute(
|
|
select(Contract).where(Contract.signed == 0))
|
|
self.view.display_results(result)
|
|
|
|
def create(self,
|
|
customer_options,
|
|
commercial_options) -> None:
|
|
"""
|
|
Create a new contracts with minimum information
|
|
:return: None; creates object in DB
|
|
"""
|
|
contract = self.view.prompt_for_contract(customer_options,
|
|
commercial_options)
|
|
if not contract['amount']:
|
|
contract['amount'] = 0
|
|
new_contract = Contract(
|
|
customer_id=contract['customer'],
|
|
commercial_id=contract['commercial'],
|
|
amount=contract['amount'],
|
|
)
|
|
self.db.add(new_contract)
|
|
self.db.commit()
|
|
self.view.display_confirm("contract", new_contract.id)
|
|
|
|
def update(self, my_id,
|
|
customer_options,
|
|
commercial_options,
|
|
event_options):
|
|
"""
|
|
Update a contract; asks for field to change and value
|
|
:param my_id: id of the contract to modify
|
|
:return: None; Update entry in DB
|
|
"""
|
|
item, value = self.view.prompt_for_contract_update()
|
|
if item == 'signed':
|
|
if value.lower() == "yes":
|
|
value = 1
|
|
else:
|
|
value = 0
|
|
# stmt = (update(Contract).where(Contract.id == my_id).values(
|
|
# signed=is_signed))
|
|
if item == 'customer_id':
|
|
value = return_menu(customer_options)
|
|
if item == 'commercial_id':
|
|
value = return_menu(commercial_options)
|
|
if item == 'event_id':
|
|
value = return_menu(event_options)
|
|
stmt = (update(Contract).where(Contract.id == my_id).values(
|
|
**{item: value}))
|
|
self.db.execute(stmt)
|
|
self.db.commit()
|
|
self.view.display_change("Contract"+str(my_id), item, value)
|
|
|
|
def delete(self, my_id) -> None:
|
|
"""
|
|
Delete a contract
|
|
:param my_id: id of the contract to delete
|
|
:return: None; deletes entry in DB
|
|
"""
|
|
stmt = delete(Contract).where(Contract.id == my_id)
|
|
self.db.execute(stmt)
|
|
self.db.commit()
|
|
|
|
|
|
class EventTools:
|
|
"""
|
|
Interface to manage Event object in DB
|
|
"""
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
self.view = View()
|
|
|
|
def list(self):
|
|
"""
|
|
List all events from DB
|
|
:return: list of Event objects
|
|
"""
|
|
result = self.db.execute(select(Event)).all()
|
|
self.view.display_results(result)
|
|
|
|
def filter(self, field, value):
|
|
"""
|
|
Retrieve only event with specified field
|
|
:param field: Event.field
|
|
:return: display a list of Event
|
|
"""
|
|
result = self.db.execute(
|
|
select(Event).filter_by(**{field: value})).all()
|
|
if not result:
|
|
self.view.display_error()
|
|
else:
|
|
self.view.display_results(result)
|
|
|
|
def filter_owned(self, user_id):
|
|
"""
|
|
List only events where support_id is the id of the user logged in
|
|
:param user_id: the connected user id
|
|
:return: list of Event objects
|
|
"""
|
|
result = self.db.execute(
|
|
select(Event).where(Event.support_id == user_id)).all()
|
|
if not result:
|
|
self.view.display_error()
|
|
return None
|
|
self.view.display_results(result)
|
|
return result
|
|
|
|
def create(self) -> None:
|
|
"""
|
|
Create a new event with minimum information
|
|
:return: None; creates object in DB
|
|
"""
|
|
event = self.view.prompt_for_event()
|
|
new_event = Event(
|
|
name=event['name'],
|
|
contract_id=event['contract_id'],
|
|
customer_id=event['customer_id'],
|
|
customer_contact=event['customer_contact'],
|
|
date_start=event['date_start'],
|
|
date_end=event['date_end'],
|
|
location=event['location'],
|
|
)
|
|
self.db.add(new_event)
|
|
self.db.commit()
|
|
self.view.display_confirm("event", new_event.id)
|
|
|
|
def update(self, my_id, support_options):
|
|
"""
|
|
Update an event in DB; asks for field to change and value
|
|
:param my_id: id of the event to modify
|
|
:return: None; Update entry in DB
|
|
"""
|
|
event = self.db.get(Event, my_id)
|
|
item, value = self.view.prompt_for_event_update(support_options)
|
|
if item == 'support_id':
|
|
value = return_menu(support_options)
|
|
stmt = (update(Event).where(Event.id == my_id).values(
|
|
**{item: value}))
|
|
self.db.execute(stmt)
|
|
self.db.commit()
|
|
self.view.display_change(event.name, item, value)
|
|
|
|
def delete(self, my_id) -> None:
|
|
"""
|
|
Delete an event
|
|
:param my_id: id of the event to delete
|
|
:return: None; deletes entry in DB
|
|
"""
|
|
stmt = delete(Event).where(Event.id == my_id)
|
|
self.db.execute(stmt)
|
|
self.db.commit()
|