Compare commits

...

21 Commits

Author SHA1 Message Date
37ffa1d346 Reformat code 2020-10-19 10:41:40 -05:00
3060a9fafb Write README.md 2020-10-16 18:56:31 -05:00
175c464f66 Add purchases and uses pages 2020-10-08 23:49:14 -05:00
fa1361e13f Change quantities page to have a dedicated reset button 2020-10-08 23:48:51 -05:00
ba9bef5c8f Gracefully catch IntegrityError when a database constraint is violated 2020-10-08 10:10:11 -05:00
91c2017759 Modify create_location to return 'api_endpoints' (plural) 2020-10-07 10:33:47 -05:00
8e44280e7d Revert "Change searching to be case insensitive"
This reverts commit 97373e0ae2.
2020-10-07 10:15:53 -05:00
97373e0ae2 Change searching to be case insensitive 2020-10-06 19:56:32 -05:00
11f21020b8 Add Visual Studio Code to .gitignore 2020-10-06 16:18:51 -05:00
df6e64afc3 Add search box for location to quantities page 2020-10-05 23:16:36 -05:00
82f37f4b25 Add products and quantities pages 2020-10-05 20:10:43 -05:00
b809edd49d Clean up docstrings 2020-10-05 19:29:07 -05:00
3d7edab395 Add trailing newline to README.md 2020-10-05 19:26:35 -05:00
951673926b Add postgres.ini.default 2020-10-05 19:25:03 -05:00
4bb70b207e Finish API 2020-10-05 19:22:24 -05:00
8c635adcc7 Reformat to follow PEP standards 2020-10-05 01:07:22 -05:00
8ae91f7ffa Update all routes to use SQLAlchemy 2020-10-05 01:00:50 -05:00
af9cdf850c Begin rewrite from psycopg2 to SQLAlchemy 2020-10-04 21:12:09 -05:00
ec2974eef3 Remove old commented-out code 2020-10-04 15:05:10 -05:00
e0f648383c Add description to block comment in webapi.py 2020-10-04 11:20:08 -05:00
7bdd4b5cdd Remove versiontest.py 2020-10-04 11:19:53 -05:00
13 changed files with 864 additions and 299 deletions

8
.gitignore vendored
View File

@ -129,5 +129,13 @@ dmypy.json
# Pyre type checker
.pyre/
# ---> VisualStudioCode
.vscode/*
#!.vscode/settings.json
#!.vscode/tasks.json
#!.vscode/launch.json
#!.vscode/extensions.json
*.code-workspace
# InvMan custom
postgres.ini

6
.pylintrc Normal file
View File

@ -0,0 +1,6 @@
[TYPECHECK]
ignored-modules=flask, flask_api, sqlalchemy
generated-members=session.*
[MESSAGES CONTROL]
disable=pointless-string-statement

202
README.md
View File

@ -1,3 +1,203 @@
# InvMan
Inventory Manager
## Table of Contents
1. [What is InvMan?](#what-is-invman)
2. [API Usage](#api-usage)
1. [GET Requests](#get-requests-getting-information)
2. [POST Requests](#post-requests-creating-objects)
## What is InvMan?
InvMan is a Flask app which provides a web API and web UI to keep inventory using a PostgreSQL backend.
---
## API Usage
### GET requests (getting information)
---
Please keep in mind:
* Text surrounded by `[` and `]` are parameters which should be replaced.
* Names are case-sensitive.
Get a list of locations
```plaintext
GET /api/v1/locations
```
Get information about a location
```plaintext
GET /api/v1/location/name/[location-name]
```
Get the quantities of all products at a location
```plaintext
GET /api/v1/location/[location-name]/quantities
```
Get the quantity of a product (by UPC) at a location
```plaintext
GET /api/v1/location/[location-name]/quantity/upc/[product-upc]
```
Get the quantity of a product (by name) at a location
```plaintext
GET /api/v1/location/[location-name]/quantity/name/[product-name]
```
Get a list of products
```plaintext
GET /api/v1/products
```
Get information about a product (by UPC)
```plaintext
GET /api/v1/product/upc/[product-upc]
```
Get information about a product (by name)
```plaintext
GET /api/v1/product/name/[product-name]
```
Get a list of brands
```plaintext
GET /api/v1/brands
```
Get information about a brand
```plaintext
GET /api/v1/brand/name/[brand-name]
```
Get a list of units
```plaintext
GET /api/v1/units
```
Get information about a unit (by name)
```plaintext
GET /api/v1/unit/name/[unit-name]
```
Get a list of purchases
```plaintext
GET /api/v1/purchases
```
Get information about a purchase (by id)
```plaintext
GET /api/v1/purchase/id/[purchase-id]
```
Get a list of uses
```plaintext
GET /api/v1/uses
```
Get information about a use (by id)
```plaintext
GET /api/v1/purchase/id/[use-id]
```
### POST Requests (creating objects)
---
Create a location
```plaintext
POST /api/v1/create_location
```
Request form:
| Argument Name | Description | Required? |
| ------------- | --------------------------- | --------- |
| name | Name of the location | Yes |
| description | Description of the location | No |
---
Create a brand
```plaintext
POST /api/v1/create_brand
```
Request form:
| Argument Name | Description | Required? |
| ------------- | ------------------------ | --------- |
| name | Name of the brand | Yes |
| description | Description of the brand | No |
---
Create a unit
```plaintext
POST /api/v1/create_unit
```
Request form:
| Argument Name | Description | Required? |
| ------------- | ----------------------- | --------- |
| name | Name of the unit | Yes |
| description | Description of the unit | No |
---
Create a product
```plaintext
POST /api/v1/create_product
```
Request form:
| Argument Name | Description | Required? |
| ------------- | ----------------------------- | --------- |
| upc | UPC of the product | Yes |
| brand | Brand which makes the product | Yes |
| name | Name of the product | Yes |
| size | Size of the product | Yes |
| sizeunit | Unit used in `size` | Yes |
| description | Description of the product | No |
---
Create a purchase
```plaintext
POST /api/v1/create_purchase
```
| Argument Name | Description | Required? |
| ------------- | -------------------------------------- | --------- |
| upc | UPC of the product which was purchased | Yes |
| quantity | Quantity of the product purchased | Yes |
| date | Date of this purchase | Yes |
| location | Location to link this purchase to | Yes |
---
Create a use
```plaintext
POST /api/v1/create_use
```
| Argument Name | Description | Required? |
| ------------- | --------------------------------- | --------- |
| upc | UPC of the product which was used | Yes |
| quantity | Quantity of the product used | Yes |
| date | Date of this usage | Yes |
| location | Location to link this usage to | Yes |

108
libdb.py Normal file
View File

@ -0,0 +1,108 @@
#!/usr/bin/env python3
"""
libdb
Library for interacting with InvMan DB using SQLAlchemy
"""
from configparser import ConfigParser
from sqlalchemy import create_engine, Column
from sqlalchemy import String, Text, Date
from sqlalchemy import BigInteger, Integer, SmallInteger, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
def load_config():
"""Load PostgreSQL config from postgres.ini"""
filename = "postgres.ini"
section = "postgresql"
parser = ConfigParser()
parser.read(filename)
config = {}
if parser.has_section(section):
params = parser.items(section)
for param in params:
config[param[0]] = param[1]
else:
raise Exception(
'Section {0} not found in the {1} file'.format(section, filename))
return config
cfg = load_config()
engine = create_engine(f"postgresql://{cfg['user']}:{cfg['pass']}@{cfg['host']}/{cfg['database']}",
echo='debug')
Base = declarative_base()
Session = sessionmaker()
Session.configure(bind=engine)
class Location(Base):
"""location table"""
__tablename__ = 'location'
name = Column(String(length=32), primary_key=True)
description = Column(Text)
class ProductQuantity(Base):
"""product_quantity table"""
__tablename__ = 'product_quantity'
product_upc = Column(String(length=32), primary_key=True)
name = Column(Text)
quantity = Column(BigInteger)
location = Column(String(length=32), primary_key=True)
class Product(Base):
"""product table"""
__tablename__ = 'product'
upc = Column(String(length=32), primary_key=True)
brand = Column(String(length=63))
name = Column(Text)
description = Column(Text)
size = Column(Float)
sizeunit = Column(String(length=32))
class Brand(Base):
"""brand table"""
__tablename__ = 'brand'
name = Column(String(length=63), primary_key=True)
description = Column(Text)
class Unit(Base):
"""unit table"""
__tablename__ = 'unit'
name = Column(String(length=32), primary_key=True)
description = Column(Text)
class Purchase(Base):
"""purchase table"""
__tablename__ = 'purchase'
id = Column(Integer, primary_key=True)
product_upc = Column(String(length=32))
quantity = Column(SmallInteger)
date = Column(Date)
location = Column(String(length=32))
class Use(Base):
"""use table"""
__tablename__ = 'use'
id = Column(Integer, primary_key=True)
product_upc = Column(String(length=32))
quantity = Column(SmallInteger)
date = Column(Date)
location = Column(String(length=32))

6
postgres.ini.default Normal file
View File

@ -0,0 +1,6 @@
[postgresql]
user=someone
pass=SuperSecretPassword
host=example.com
port=5432
database=inventory

View File

@ -1,3 +1,4 @@
Flask
Flask_API
psycopg2-binary
sqlalchemy

View File

@ -0,0 +1,27 @@
<!DOCTYPE HTML>
<html>
<body>
<table border=1>
<tr>
<th>UPC</th>
<td>Brand</td>
<td>Name</td>
<td>Size</td>
<td>Size Unit</td>
<td>Description</td>
</tr>
{% for upc, brand, name, size, sizeunit, description in data %}
<tr>
<th> {{ upc }} </th>
<td> {{ brand }} </td>
<td> {{ name }} </td>
<td> {{ size }} </td>
<td> {{ sizeunit }} </td>
<td> {{ description }} </td>
</tr>
{% endfor %}
</table>
</body>
</html>

View File

@ -0,0 +1,31 @@
<!DOCTYPE HTML>
<html>
<body>
<form action="./purchases">
<input type="text" placeholder="Location" name="location" />
<input type="submit" />
<br>
<input type="reset" value="Reset" onclick="parent.location='./purchases'" />
</form>
<table border=1>
<tr>
<th>ID</th>
<td>UPC</td>
<td>Quantity</td>
<td>Date</td>
<td>Location</td>
</tr>
{% for id, upc, quantity, date, location in data %}
<tr>
<th> {{ id }}</th>
<th> {{ upc }} </th>
<td> {{ quantity }} </td>
<td> {{ date }}</td>
<td> {{ location }} </td>
</tr>
{% endfor %}
</table>
</body>
</html>

View File

@ -0,0 +1,27 @@
<!DOCTYPE HTML>
<html>
<body>
<form action="./quantities">
<input type="text" placeholder="Location" name="location" />
<input type="submit" />
<br>
<input type="reset" value="Reset" onclick="parent.location='./quantities'" />
</form>
<table border=1>
<tr>
<th>UPC</th>
<td>Quantity</td>
<td>Location</td>
</tr>
{% for upc, quantity, location in data %}
<tr>
<th> {{ upc }} </th>
<td> {{ quantity }} </td>
<td> {{ location }} </td>
</tr>
{% endfor %}
</table>
</body>
</html>

31
templates/ui/uses.html Normal file
View File

@ -0,0 +1,31 @@
<!DOCTYPE HTML>
<html>
<body>
<form action="./uses">
<input type="text" placeholder="Location" name="location" />
<input type="submit" />
<br>
<input type="reset" value="Reset" onclick="parent.location='./uses'" />
</form>
<table border=1>
<tr>
<th>ID</th>
<td>UPC</td>
<td>Quantity</td>
<td>Date</td>
<td>Location</td>
</tr>
{% for id, upc, quantity, date, location in data %}
<tr>
<th> {{ id }}</th>
<th> {{ upc }} </th>
<td> {{ quantity }} </td>
<td> {{ date }}</td>
<td> {{ location }} </td>
</tr>
{% endfor %}
</table>
</body>
</html>

View File

@ -1,54 +0,0 @@
#!/usr/bin/env python3
"""
libinv
Library for interacting with inventory database.
"""
# Imports
import psycopg2
from configparser import ConfigParser
def config(filename="postgres.ini", section="postgresql"):
parser = ConfigParser()
parser.read(filename)
db = {}
if parser.has_section(section):
params = parser.items(section)
for param in params:
db[param[0]] = param[1]
else:
raise Exception('Section {0} not found in the {1} file'.format(section, filename))
return db
def connect():
""" Connect to the PostgreSQL database server """
conn = None
try:
params = config()
print("Connecting to the PostgreSQL database...")
conn = psycopg2.connect(**params)
cur = conn.cursor()
# execute a statement
print("Postgres version:")
cur.execute("SELECT version()")
# display the database version
db_version = cur.fetchone()
print(db_version)
except (Exception, psycopg2.DatabaseError) as e:
print(e)
finally:
if conn is not None:
conn.close()
print("Database connection closed.")
if __name__ == '__main__':
connect()

581
webapi.py
View File

@ -1,301 +1,394 @@
#!/usr/bin/env python3
"""
InvMan
InvMan Web API
Web API for InvMan
"""
# Imports
from flask import Flask, request, jsonify, abort, g
from flask import Flask, jsonify, request
from flask_api import status
import psycopg2
from psycopg2 import pool
from configparser import ConfigParser
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.exc import IntegrityError
from libdb import Session, Location, ProductQuantity, Product, Brand, Unit, Purchase, Use
app = Flask(__name__) # app is the Flask app
filename = "postgres.ini"
section = "postgresql"
parser = ConfigParser()
parser.read(filename)
config = {}
if parser.has_section(section):
params = parser.items(section)
for param in params:
config[param[0]] = param[1]
else:
raise Exception('Section {0} not found in the {1} file'.format(section, filename))
app.config['postgreSQL_pool'] = psycopg2.pool.SimpleConnectionPool(1, 20,
user = config['user'],
password = config['password'],
host = config['host'],
port = config['port'],
database = config['database'])
def get_db():
if 'db' not in g:
g.db = app.config['postgreSQL_pool'].getconn()
return g.db
@app.teardown_appcontext
def close_conn(e):
db = g.pop('db', None)
if db is not None:
app.config['postgreSQL_pool'].putconn(db)
@app.route("/api/v1/locations", methods = ["GET"])
@app.route("/api/v1/locations", methods=["GET"])
def get_locations():
db = get_db()
with db.cursor() as cur:
cur.execute("SELECT name,description FROM location")
data = cur.fetchall()
if len(data) <= 0:
print("no results returned, sending 404")
return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND
else:
print("ran SELECT name,description FROM location")
data2 = {}
for row in data:
data2[row[0]] = {}
data2[row[0]]['description'] = row[1]
# data2 = []
# for row in data:
# data2.append({'name': row[0], 'description': row[1]})
print("processed data:")
print(data2)
return jsonify(data2)
"""Route to get locations"""
session = Session()
data = {}
try:
for name, desc in session.query(Location.name, Location.description):
data[name] = {'description': desc}
except NoResultFound:
return jsonify({'error': 'NO_RESULT_FOUND'}), status.HTTP_404_NOT_FOUND
return jsonify(data)
@app.route("/api/v1/location/name/<search>", methods = ["GET"])
@app.route("/api/v1/location/name/<search>", methods=["GET"])
def api_get_location_information(search):
db = get_db()
with db.cursor() as cur:
cur.execute("SELECT name,description FROM location WHERE name = %s", (search,))
data = cur.fetchall()
print("ran SELECT name,description FROM location WHERE name = %s")
if len(data) <= 0:
print("no results returned, sending 404")
return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND
else:
data2 = {}
row = data[0]
data2['name'] = row[0]
data2['description'] = row[1]
print("processed data:")
print(data2)
return jsonify(data2)
"""Route to get information about a location"""
session = Session()
try:
data = session.query(Location.name, Location.description). \
filter(Location.name == search).one()
except NoResultFound:
return jsonify({'error': 'NO_RESULT_FOUND'}), status.HTTP_404_NOT_FOUND
data2 = {}
data2['name'] = data.name
data2['description'] = data.description
return jsonify(data2)
@app.route("/api/v1/location/<location>/quantities", methods = ["GET"])
@app.route("/api/v1/location/<location>/quantities", methods=["GET"])
def api_get_current_quantities(location):
db = get_db()
with db.cursor() as cur:
cur.execute("SELECT product_upc,quantity FROM product_quantity WHERE location = %s", (location,))
data = cur.fetchall()
print("ran SELECT product_upc,quantity FROM product_quantity WHERE location = %s")
if len(data) <= 0:
print("no results returned, sending 404")
return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND
else:
data2 = {}
for row in data:
data2[row[0]] = row[1]
print("processed data:")
print(data2)
return jsonify(data2)
"""Route to get quantities in a location"""
session = Session()
data = {}
try:
for upc, quantity in session.query(ProductQuantity.product_upc, ProductQuantity.quantity) \
.filter(ProductQuantity.location == location).all():
data[upc] = quantity
return jsonify(data)
except NoResultFound:
return jsonify({'error': 'NO_RESULT_FOUND'}), status.HTTP_404_NOT_FOUND
@app.route("/api/v1/location/<location>/quantity/<searchmethod>/<search>", methods = ["GET"])
@app.route("/api/v1/location/<location>/quantity/<searchmethod>/<search>", methods=["GET"])
def api_get_quantity_of_product_in_location(location, searchmethod, search):
db = get_db()
with db.cursor() as cur:
methodmap = {'upc': 'product_upc', 'name': 'name'}
if searchmethod in methodmap.keys():
method = methodmap[searchmethod]
cur.execute("SELECT quantity FROM product_quantity WHERE location = %s AND {0} = %s".format(method), (location, search))
data = cur.fetchall()
print("ran SELECT quantity FROM product_quantity WHERE location = %s AND {0} = %s".format(method))
if len(data) <= 0:
print("no results returned, sending 404")
return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND
else:
data2 = data[0][0]
print("processed data:")
print(data2)
return jsonify(data2)
"""Route to get the quantity of a product at a location"""
session = Session()
try:
if searchmethod == 'upc':
data = session.query(ProductQuantity.quantity).filter(
ProductQuantity.location == location,
ProductQuantity.product_upc == search).one()
return jsonify(data[0])
elif searchmethod == 'name':
data = session.query(ProductQuantity.quantity).filter(
ProductQuantity.location == location,
ProductQuantity.name == search).one()
return jsonify(data[0])
else:
print("invalid search method, sending 400")
return jsonify({'error': 'INVALID_SEARCH_METHOD'}), status.HTTP_400_BAD_REQUEST
except NoResultFound:
return jsonify({'error': 'NO_RESULT_FOUND'}), status.HTTP_404_NOT_FOUND
@app.route("/api/v1/products", methods = ["GET"])
@app.route("/api/v1/products", methods=["GET"])
def api_get_products():
db = get_db()
with db.cursor() as cur:
cur.execute("SELECT upc,brand,name,size,sizeunit,description FROM product")
data = cur.fetchall()
print("ran SELECT upc,brand,name,size,sizeunit,description FROM product")
if len(data) <= 0:
print("no results returned, sending 404")
return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND
else:
data2 = {}
for row in data:
data2[row[0]] = {'brand': row[1], 'name': row[2], 'size': row[3], 'sizeunit': row[4], 'description': row[5]}
print("processed data:")
print(data2)
return jsonify(data2)
"""Route to get a list of products"""
session = Session()
try:
data = {}
for upc, brand, name, size, sizeunit, description in session.query(
Product.upc, Product.brand, Product.name,
Product.size, Product.sizeunit, Product.description).all():
data[upc] = {'brand': brand, 'name': name, 'size': size,
'sizeunit': sizeunit, 'description': description}
return jsonify(data)
except NoResultFound:
return jsonify({'error': 'NO_RESULT_FOUND'}), status.HTTP_404_NOT_FOUND
@app.route("/api/v1/product/<searchmethod>/<search>", methods = ["GET"])
@app.route("/api/v1/product/<searchmethod>/<search>", methods=["GET"])
def api_get_product_information(searchmethod, search):
db = get_db()
with db.cursor() as cur:
if searchmethod in ['name', 'upc']:
cur.execute("SELECT upc,brand,name,size,sizeunit,description FROM product WHERE {0} = %s".format(searchmethod), (search,))
data = cur.fetchall()
print("ran SELECT upc,brand,name,size,sizeunit,description FROM product WHERE {0} = %s")
if len(data) <= 0:
print("no results returned, sending 404")
return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND
else:
row = data[0]
"""Route to get information about a product"""
session = Session()
try:
if searchmethod == 'name':
result = session.query(Product.upc, Product.brand, Product.name,
Product.size, Product.sizeunit, Product.description).filter(
Product.name == search
).one()
data = {'upc': result[0], 'brand': result[1], 'name': result[2], 'size': result[3],
'sizeunit': result[4], 'description': result[5]}
return jsonify(data)
data2 = {}
data2['upc'] = row[0]
data2['brand'] = row[1]
data2['name'] = row[2]
data2['size'] = row[3]
data2['sizeunit'] = row[4]
data2['description'] = row[5]
if searchmethod == 'upc':
result = session.query(Product.upc, Product.brand, Product.name,
Product.size, Product.sizeunit, Product.description).filter(
Product.upc == search
).one()
data = {'upc': result[0], 'brand': result[1], 'name': result[2], 'size': result[3],
'sizeunit': result[4], 'description': result[5]}
return jsonify(data)
print("processed data:")
print(data2)
return jsonify(data2)
else:
print("invalid search method, sending 400")
return jsonify({'error': 'INVALID_SEARCH_METHOD'}), status.HTTP_400_BAD_REQUEST
@app.route("/api/v1/brands", methods = ["GET"])
def api_list_brands():
db = get_db()
with db.cursor() as cur:
cur.execute("SELECT name,description FROM brand")
data = cur.fetchall()
print("ran SELECT name,description FROM brand")
if len(data) <= 0:
print("no results returned, sending 404")
return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND
else:
data2 = {}
for row in data:
data2[row[0]] = {'description': row[1]}
print("processed data:")
print(data2)
return jsonify(data2)
except NoResultFound:
return jsonify({'error': 'NO_RESULT_FOUND'}), status.HTTP_404_NOT_FOUND
@app.route("/api/v1/brand/name/<search>", methods = ["GET"])
@app.route("/api/v1/brands", methods=["GET"])
def api_get_brands():
"""Route to get a list of all brands"""
session = Session()
try:
data = {}
for name, description in session.query(Brand.name, Brand.description).all():
data[name] = {'description': description}
return jsonify(data)
except NoResultFound:
return jsonify({'error': 'NO_RESULT_FOUND'}), status.HTTP_404_NOT_FOUND
@app.route("/api/v1/brand/name/<search>", methods=["GET"])
def api_get_brand_by_name(search):
db = get_db()
with db.cursor() as cur:
cur.execute("SELECT name,description FROM brand WHERE name = %s", (search,))
data = cur.fetchall()
print("ran SELECT name,description FROM brand WHERE name = %s")
if len(data) <= 0:
print("no results returned, sending 404")
return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND
else:
data2 = {}
for row in data:
data2[row[0]] = {'description': row[1]}
print("processed data:")
print(data2)
return jsonify(data2)
"""Route to get information about a location"""
session = Session()
try:
data = session.query(Brand.name, Brand.description).filter(
Brand.name == search).one()
except NoResultFound:
return jsonify({'error': 'NO_RESULT_FOUND'}), status.HTTP_404_NOT_FOUND
data2 = {'name': data.name, 'description': data.description}
return jsonify(data2)
@app.route("/api/v1/create_location", methods = ["POST"])
@app.route("/api/v1/units", methods=["GET"])
def api_get_units():
"""Route to get a list of all units"""
session = Session()
try:
data = {}
for name, description in session.query(Unit.name, Unit.description).all():
data[name] = {'description': description}
return jsonify(data)
except NoResultFound:
return jsonify({'error': 'NO_RESULT_FOUND'}), status.HTTP_404_NOT_FOUND
@app.route("/api/v1/unit/name/<search>", methods=["GET"])
def api_get_unit_by_name(search):
"""Route to get information about a unit"""
session = Session()
try:
data = session.query(Unit.name, Unit.description).filter(
Unit.name == search).one()
except NoResultFound:
return jsonify({'error': 'NO_RESULT_FOUND'}), status.HTTP_404_NOT_FOUND
data2 = {'name': data.name, 'description': data.description}
return jsonify(data2)
@app.route("/api/v1/purchases", methods=["GET"])
def api_get_purchases():
"""Route to get a list of all purchases"""
session = Session()
try:
data = {}
query = session.query(Purchase.id,
Purchase.product_upc, Purchase.quantity, Purchase.date,
Purchase.location)
for purchase_id, upc, quantity, date, location in query:
data[purchase_id] = {'upc': upc, 'quantity': quantity,
'date': str(date), 'location': location}
return jsonify(data)
except NoResultFound:
return jsonify({'error': 'NO_RESULT_FOUND'}), status.HTTP_404_NOT_FOUND
@app.route("/api/v1/purchase/id/<search>", methods=["GET"])
def api_get_purchase_by_id(search):
"""Route to get information about a purchase"""
session = Session()
try:
data = session.query(Purchase.id, Purchase.product_upc, Purchase.quantity,
Purchase.date, Purchase.location).filter(Purchase.id == search).one()
except NoResultFound:
return jsonify({'error': 'NO_RESULT_FOUND'}), status.HTTP_404_NOT_FOUND
data2 = {'id': data.id, 'upc': data.product_upc,
'quantity': data.quantity, 'date': str(data.date), 'location': data.location}
return jsonify(data2)
@app.route("/api/v1/uses", methods=["GET"])
def api_get_uses():
"""Route to get a list of all uses"""
session = Session()
try:
data = {}
query = session.query(Use.id,
Use.product_upc, Use.quantity, Use.date,
Use.location)
for use_id, upc, quantity, date, location in query:
data[use_id] = {'upc': upc, 'quantity': quantity,
'date': str(date), 'location': location}
return jsonify(data)
except NoResultFound:
return jsonify({'error': 'NO_RESULT_FOUND'}), status.HTTP_404_NOT_FOUND
@app.route("/api/v1/use/id/<search>", methods=["GET"])
def api_get_use_by_id(search):
"""Route to get information about a use"""
session = Session()
try:
data = session.query(Use.id, Use.product_upc, Use.quantity,
Use.date, Use.location).filter(Use.id == search).one()
except NoResultFound:
return jsonify({'error': 'NO_RESULT_FOUND'}), status.HTTP_404_NOT_FOUND
data2 = {'id': data.id, 'upc': data.product_upc,
'quantity': data.quantity, 'date': str(data.date), 'location': data.location}
return jsonify(data2)
@app.route("/api/v1/create_location", methods=["POST"])
def api_create_location():
db = get_db()
with db.cursor() as cur:
if "name" not in request.form:
return jsonify({'error': 'NO_NAME_PROVIDED'}), status.HTTP_400_BAD_REQUEST
else:
locname = request.form['name']
if "description" in request.form:
locdesc = request.form['description']
else:
locdesc = None
"""Route to create a new location"""
session = Session()
if "name" not in request.form:
return jsonify({'error': 'NO_NAME_PROVIDED'}), status.HTTP_400_BAD_REQUEST
else:
locname = request.form['name']
if "description" in request.form:
locdesc = request.form['description']
else:
locdesc = None
print(locname, locdesc)
cur.execute("INSERT INTO location (name, description) VALUES (%s, %s)", (locname, locdesc))
print("ran INSERT INTO location (name, description) VALUES (%s, %s)")
db.commit()
print("committed changes to database")
return jsonify({'api_endpoint': '/api/v1/location/name/{0}'.format(locname)})
newlocation = Location(name=locname, description=locdesc)
session.add(newlocation)
print("added newlocation")
try:
session.commit()
print("committed")
return jsonify({'api_endpoints': f'/api/v1/location/name/{locname}'})
except IntegrityError:
session.rollback()
return jsonify({'error': 'INTEGRITY_ERROR'})
@app.route("/api/v1/create_brand", methods = ["POST"])
@app.route("/api/v1/create_brand", methods=["POST"])
def api_create_brand():
db = get_db()
with db.cursor() as cur:
if "name" not in request.form:
return jsonify({'error': 'NO_NAME_PROVIDED'}), status.HTTP_400_BAD_REQUEST
else:
locname = request.form['name']
if "description" in request.form:
locdesc = request.form['description']
else:
locdesc = None
"""Route to create a new brand"""
session = Session()
if "name" not in request.form:
return jsonify({'error': 'NO_NAME_PROVIDED'}), status.HTTP_400_BAD_REQUEST
else:
brandname = request.form['name']
if "description" in request.form:
branddesc = request.form['description']
else:
branddesc = None
print(locname, locdesc)
cur.execute("INSERT INTO brand (name, description) VALUES (%s, %s)", (locname, locdesc))
print("ran INSERT INTO brand (name, description) VALUES (%s, %s)")
db.commit()
print("committed changes to database")
return jsonify({'api_endpoint': '/api/v1/brand/name/{0}'.format(locname)})
newbrand = Brand(name=brandname, description=branddesc)
session.add(newbrand)
print("added newbrand")
try:
session.commit()
print("committed")
return jsonify({'api_endpoints': [f'/api/v1/brand/name/{brandname}']})
except IntegrityError:
session.rollback()
return jsonify({'error': 'INTEGRITY_ERROR'})
# @app.route("/<location>/api/v1/item/<item>/<brand>/new_brand", methods = ["POST"])
# def api_new_brand(location, item, brand):
# try:
# storage.init_brand(location, item, brand)
# print("Created brand {0} under item {1} at {2}".format(brand, item, location))
# return jsonify({'status': 'success'})
# except storage.AlreadyExistsError:
# return jsonify({'status': 'error', 'error': 'BRAND_ALREADY_EXISTS'}), status.HTTP_403_FORBIDDEN
@app.route("/api/v1/create_product", methods=["POST"])
def api_create_product():
"""Route to create a new product"""
session = Session()
reqfields = ['upc', 'brand', 'name', 'size', 'sizeunit']
for field in reqfields:
if field not in request.form:
return jsonify({'error': f'NO_{field.upper()}_PROVIDED'}), status.HTTP_400_BAD_REQUEST
if "description" in request.form:
desc = request.form['description']
else:
desc = None
newproduct = Product(upc=request.form['upc'], brand=request.form['brand'],
name=request.form['name'], size=request.form['size'],
sizeunit=request.form['sizeunit'],
description=desc)
session.add(newproduct)
print("added newproduct")
try:
session.commit()
print("committed")
return jsonify({'api_endpoints': [f'/api/v1/product/upc/{request.form["upc"]}',
f'/api/v1/product/name/{request.form["name"]}']})
except IntegrityError:
session.rollback()
return jsonify({'error': 'INTEGRITY_ERROR'})
# @app.route("/<location>/api/v1/item/<item>/new_item", methods = ["POST"])
# def api_new_item(location, item):
# try:
# storage.init_item(location, item)
# print("Created item {0} at {1}".format(item, location))
# return jsonify({'status': 'success'})
# except storage.AlreadyExistsError:
# return jsonify({'status': 'error', 'error': 'ITEM_ALREADY_EXISTS'}), status.HTTP_403_FORBIDDEN
@app.route("/api/v1/create_unit", methods=["POST"])
def api_create_unit():
"""Route to create a new unit"""
session = Session()
if "name" not in request.form:
return jsonify({'error': 'NO_NAME_PROVIDED'}), status.HTTP_400_BAD_REQUEST
else:
unitname = request.form['name']
if "description" in request.form:
unitdesc = request.form['description']
else:
unitdesc = None
newunit = Unit(name=unitname, description=unitdesc)
session.add(newunit)
print("added newunit")
try:
session.commit()
print("committed")
return jsonify({'api_endpoints': [f'/api/v1/unit/name/{unitname}']})
except IntegrityError:
session.rollback()
return jsonify({'error': 'INTEGRITY_ERROR'})
# @app.route("/<location>/api/v1/new_location", methods = ["POST"])
# def api_new_location(location):
# try:
# storage.init_location(location)
# print("Created location {0}".format(location))
# return jsonify({'status': 'success'})
# except storage.AlreadyExistsError:
# return jsonify({'status': 'error', 'error': 'ITEM_ALREADY_EXISTS'}), status.HTTP_403_FORBIDDEN
@app.route("/api/v1/create_purchase", methods=["POST"])
def api_create_purchase():
"""Route to create a new purchase"""
session = Session()
reqfields = ['upc', 'quantity', 'date', 'location']
for field in reqfields:
if field not in request.form:
return jsonify({'error': f'NO_{field.upper()}_PROVIDED'}), status.HTTP_400_BAD_REQUEST
newpurchase = Purchase(product_upc=request.form['upc'], quantity=request.form['quantity'],
date=request.form['date'], location=request.form['location'])
session.add(newpurchase)
print("added newpurchase")
try:
session.commit()
print("committed")
return jsonify({'api_endpoints': [f'/api/v1/purchase/id/{newpurchase.id}']})
except IntegrityError:
session.rollback()
return jsonify({'error': 'INTEGRITY_ERROR'})
# @app.route("/<location>/api/v1/item/<item>/<brand>/add_amount/<amount>", methods = ["POST"])
# def api_add_amount(location, item, brand, amount):
# storage.increment_amount(location, item, brand, amount)
# return jsonify({'status': 'success'})
@app.route("/api/v1/create_use", methods=["POST"])
def api_create_use():
"""Route to create a new use"""
session = Session()
reqfields = ['upc', 'quantity', 'date', 'location']
for field in reqfields:
if field not in request.form:
return jsonify({'error': f'NO_{field.upper()}_PROVIDED'}), status.HTTP_400_BAD_REQUEST
newuse = Use(product_upc=request.form['upc'], quantity=request.form['quantity'],
date=request.form['date'], location=request.form['location'])
session.add(newuse)
print("added newuse")
try:
session.commit()
print("committed")
return jsonify({'api_endpoints': [f'/api/v1/use/id/{newuse.id}']})
except IntegrityError:
session.rollback()
return jsonify({'error': 'INTEGRITY_ERROR'})
if __name__ == "__main__":

81
webui.py Normal file
View File

@ -0,0 +1,81 @@
#!/usr/bin/env python3
"""
InvMan Web UI
Web User Interface for InvMan
"""
# Imports
from flask import Flask, render_template, request
from libdb import Session, Product, ProductQuantity, Purchase, Use
app = Flask(__name__) # app is the Flask app
@app.route("/ui/products")
def products_list_page():
"""Route for the products page"""
session = Session()
data = session.query(
Product.upc, Product.brand, Product.name,
Product.size, Product.sizeunit, Product.description).all()
return render_template("ui/products.html", data=data)
@app.route("/ui/quantities")
def products_quantities_page():
"""Route for the quantities page"""
session = Session()
query = session.query(ProductQuantity.product_upc,
ProductQuantity.quantity, ProductQuantity.location)
if "location" in request.args:
searchloc = request.args['location']
if len(request.args['location']) > 0:
data = query.filter(ProductQuantity.location ==
request.args['location']).all()
else:
data = query.all()
else:
searchloc = ""
data = query.all()
return render_template("ui/quantities.html", data=data, searchloc=searchloc)
@app.route("/ui/purchases")
def products_purchases_page():
"""Route for the purchases page"""
session = Session()
query = session.query(Purchase.id, Purchase.product_upc,
Purchase.quantity, Purchase.date, Purchase.location)
if "location" in request.args:
searchloc = request.args['location']
if len(request.args['location']) > 0:
data = query.filter(Purchase.location ==
request.args['location']).all()
else:
data = query.all()
else:
searchloc = ""
data = query.all()
return render_template("ui/purchases.html", data=data, searchloc=searchloc)
@app.route("/ui/uses")
def products_uses_page():
"""Route for the uses page"""
session = Session()
query = session.query(Use.id, Use.product_upc,
Use.quantity, Use.date, Use.location)
if "location" in request.args:
searchloc = request.args['location']
if len(request.args['location']) > 0:
data = query.filter(Use.location ==
request.args['location']).all()
else:
data = query.all()
else:
searchloc = ""
data = query.all()
return render_template("ui/uses.html", data=data, searchloc=searchloc)