303 lines
11 KiB
Python
303 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
InvMan
|
|
"""
|
|
|
|
# Imports
|
|
from flask import Flask, request, jsonify, abort, g
|
|
from flask_api import status
|
|
import psycopg2
|
|
from psycopg2 import pool
|
|
from configparser import ConfigParser
|
|
|
|
|
|
app = Flask(__name__) # app is the Flask app
|
|
|
|
|
|
filename = "postgres.ini"
|
|
section = "postgresql"
|
|
parser = ConfigParser()
|
|
parser.read(filename)
|
|
|
|
config = {}
|
|
if parser.has_section(section):
|
|
params = parser.items(section)
|
|
for param in params:
|
|
config[param[0]] = param[1]
|
|
else:
|
|
raise Exception('Section {0} not found in the {1} file'.format(section, filename))
|
|
|
|
|
|
app.config['postgreSQL_pool'] = psycopg2.pool.SimpleConnectionPool(1, 20,
|
|
user = config['user'],
|
|
password = config['password'],
|
|
host = config['host'],
|
|
port = config['port'],
|
|
database = config['database'])
|
|
|
|
|
|
def get_db():
|
|
if 'db' not in g:
|
|
g.db = app.config['postgreSQL_pool'].getconn()
|
|
return g.db
|
|
|
|
|
|
@app.teardown_appcontext
|
|
def close_conn(e):
|
|
db = g.pop('db', None)
|
|
if db is not None:
|
|
app.config['postgreSQL_pool'].putconn(db)
|
|
|
|
|
|
@app.route("/api/v1/locations", methods = ["GET"])
|
|
def get_locations():
|
|
db = get_db()
|
|
with db.cursor() as cur:
|
|
cur.execute("SELECT name,description FROM location")
|
|
data = cur.fetchall()
|
|
if len(data) <= 0:
|
|
print("no results returned, sending 404")
|
|
return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND
|
|
else:
|
|
print("ran SELECT name,description FROM location")
|
|
data2 = {}
|
|
for row in data:
|
|
data2[row[0]] = {}
|
|
data2[row[0]]['description'] = row[1]
|
|
# data2 = []
|
|
# for row in data:
|
|
# data2.append({'name': row[0], 'description': row[1]})
|
|
print("processed data:")
|
|
print(data2)
|
|
return jsonify(data2)
|
|
|
|
|
|
@app.route("/api/v1/location/name/<search>", methods = ["GET"])
|
|
def api_get_location_information(search):
|
|
db = get_db()
|
|
with db.cursor() as cur:
|
|
cur.execute("SELECT name,description FROM location WHERE name = %s", (search,))
|
|
data = cur.fetchall()
|
|
print("ran SELECT name,description FROM location WHERE name = %s")
|
|
if len(data) <= 0:
|
|
print("no results returned, sending 404")
|
|
return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND
|
|
else:
|
|
data2 = {}
|
|
row = data[0]
|
|
data2['name'] = row[0]
|
|
data2['description'] = row[1]
|
|
print("processed data:")
|
|
print(data2)
|
|
return jsonify(data2)
|
|
|
|
|
|
@app.route("/api/v1/location/<location>/quantities", methods = ["GET"])
|
|
def api_get_current_quantities(location):
|
|
db = get_db()
|
|
with db.cursor() as cur:
|
|
cur.execute("SELECT product_upc,quantity FROM product_quantity WHERE location = %s", (location,))
|
|
data = cur.fetchall()
|
|
print("ran SELECT product_upc,quantity FROM product_quantity WHERE location = %s")
|
|
if len(data) <= 0:
|
|
print("no results returned, sending 404")
|
|
return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND
|
|
else:
|
|
data2 = {}
|
|
for row in data:
|
|
data2[row[0]] = row[1]
|
|
print("processed data:")
|
|
print(data2)
|
|
return jsonify(data2)
|
|
|
|
|
|
@app.route("/api/v1/location/<location>/quantity/<searchmethod>/<search>", methods = ["GET"])
|
|
def api_get_quantity_of_product_in_location(location, searchmethod, search):
|
|
db = get_db()
|
|
with db.cursor() as cur:
|
|
methodmap = {'upc': 'product_upc', 'name': 'name'}
|
|
if searchmethod in methodmap.keys():
|
|
method = methodmap[searchmethod]
|
|
cur.execute("SELECT quantity FROM product_quantity WHERE location = %s AND {0} = %s".format(method), (location, search))
|
|
data = cur.fetchall()
|
|
print("ran SELECT quantity FROM product_quantity WHERE location = %s AND {0} = %s".format(method))
|
|
if len(data) <= 0:
|
|
print("no results returned, sending 404")
|
|
return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND
|
|
else:
|
|
data2 = data[0][0]
|
|
print("processed data:")
|
|
print(data2)
|
|
return jsonify(data2)
|
|
else:
|
|
print("invalid search method, sending 400")
|
|
return jsonify({'error': 'INVALID_SEARCH_METHOD'}), status.HTTP_400_BAD_REQUEST
|
|
|
|
|
|
@app.route("/api/v1/products", methods = ["GET"])
|
|
def api_get_products():
|
|
db = get_db()
|
|
with db.cursor() as cur:
|
|
cur.execute("SELECT upc,brand,name,size,sizeunit,description FROM product")
|
|
data = cur.fetchall()
|
|
print("ran SELECT upc,brand,name,size,sizeunit,description FROM product")
|
|
if len(data) <= 0:
|
|
print("no results returned, sending 404")
|
|
return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND
|
|
else:
|
|
data2 = {}
|
|
for row in data:
|
|
data2[row[0]] = {'brand': row[1], 'name': row[2], 'size': row[3], 'sizeunit': row[4], 'description': row[5]}
|
|
print("processed data:")
|
|
print(data2)
|
|
return jsonify(data2)
|
|
|
|
@app.route("/api/v1/product/<searchmethod>/<search>", methods = ["GET"])
|
|
def api_get_product_information(searchmethod, search):
|
|
db = get_db()
|
|
with db.cursor() as cur:
|
|
if searchmethod in ['name', 'upc']:
|
|
cur.execute("SELECT upc,brand,name,size,sizeunit,description FROM product WHERE {0} = %s".format(searchmethod), (search,))
|
|
data = cur.fetchall()
|
|
print("ran SELECT upc,brand,name,size,sizeunit,description FROM product WHERE {0} = %s")
|
|
if len(data) <= 0:
|
|
print("no results returned, sending 404")
|
|
return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND
|
|
else:
|
|
row = data[0]
|
|
|
|
data2 = {}
|
|
data2['upc'] = row[0]
|
|
data2['brand'] = row[1]
|
|
data2['name'] = row[2]
|
|
data2['size'] = row[3]
|
|
data2['sizeunit'] = row[4]
|
|
data2['description'] = row[5]
|
|
|
|
print("processed data:")
|
|
print(data2)
|
|
return jsonify(data2)
|
|
else:
|
|
print("invalid search method, sending 400")
|
|
return jsonify({'error': 'INVALID_SEARCH_METHOD'}), status.HTTP_400_BAD_REQUEST
|
|
|
|
|
|
@app.route("/api/v1/brands", methods = ["GET"])
|
|
def api_list_brands():
|
|
db = get_db()
|
|
with db.cursor() as cur:
|
|
cur.execute("SELECT name,description FROM brand")
|
|
data = cur.fetchall()
|
|
print("ran SELECT name,description FROM brand")
|
|
if len(data) <= 0:
|
|
print("no results returned, sending 404")
|
|
return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND
|
|
else:
|
|
data2 = {}
|
|
for row in data:
|
|
data2[row[0]] = {'description': row[1]}
|
|
print("processed data:")
|
|
print(data2)
|
|
return jsonify(data2)
|
|
|
|
|
|
@app.route("/api/v1/brand/name/<search>", methods = ["GET"])
|
|
def api_get_brand_by_name(search):
|
|
db = get_db()
|
|
with db.cursor() as cur:
|
|
cur.execute("SELECT name,description FROM brand WHERE name = %s", (search,))
|
|
data = cur.fetchall()
|
|
print("ran SELECT name,description FROM brand WHERE name = %s")
|
|
if len(data) <= 0:
|
|
print("no results returned, sending 404")
|
|
return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND
|
|
else:
|
|
data2 = {}
|
|
for row in data:
|
|
data2[row[0]] = {'description': row[1]}
|
|
print("processed data:")
|
|
print(data2)
|
|
return jsonify(data2)
|
|
|
|
|
|
@app.route("/api/v1/create_location", methods = ["POST"])
|
|
def api_create_location():
|
|
db = get_db()
|
|
with db.cursor() as cur:
|
|
if "name" not in request.form:
|
|
return jsonify({'error': 'NO_NAME_PROVIDED'}), status.HTTP_400_BAD_REQUEST
|
|
else:
|
|
locname = request.form['name']
|
|
if "description" in request.form:
|
|
locdesc = request.form['description']
|
|
else:
|
|
locdesc = None
|
|
|
|
print(locname, locdesc)
|
|
cur.execute("INSERT INTO location (name, description) VALUES (%s, %s)", (locname, locdesc))
|
|
print("ran INSERT INTO location (name, description) VALUES (%s, %s)")
|
|
db.commit()
|
|
print("committed changes to database")
|
|
return jsonify({'api_endpoint': '/api/v1/location/name/{0}'.format(locname)})
|
|
|
|
|
|
@app.route("/api/v1/create_brand", methods = ["POST"])
|
|
def api_create_brand():
|
|
db = get_db()
|
|
with db.cursor() as cur:
|
|
if "name" not in request.form:
|
|
return jsonify({'error': 'NO_NAME_PROVIDED'}), status.HTTP_400_BAD_REQUEST
|
|
else:
|
|
locname = request.form['name']
|
|
if "description" in request.form:
|
|
locdesc = request.form['description']
|
|
else:
|
|
locdesc = None
|
|
|
|
print(locname, locdesc)
|
|
cur.execute("INSERT INTO brand (name, description) VALUES (%s, %s)", (locname, locdesc))
|
|
print("ran INSERT INTO brand (name, description) VALUES (%s, %s)")
|
|
db.commit()
|
|
print("committed changes to database")
|
|
return jsonify({'api_endpoint': '/api/v1/brand/name/{0}'.format(locname)})
|
|
|
|
|
|
# @app.route("/<location>/api/v1/item/<item>/<brand>/new_brand", methods = ["POST"])
|
|
# def api_new_brand(location, item, brand):
|
|
# try:
|
|
# storage.init_brand(location, item, brand)
|
|
# print("Created brand {0} under item {1} at {2}".format(brand, item, location))
|
|
# return jsonify({'status': 'success'})
|
|
# except storage.AlreadyExistsError:
|
|
# return jsonify({'status': 'error', 'error': 'BRAND_ALREADY_EXISTS'}), status.HTTP_403_FORBIDDEN
|
|
|
|
|
|
# @app.route("/<location>/api/v1/item/<item>/new_item", methods = ["POST"])
|
|
# def api_new_item(location, item):
|
|
# try:
|
|
# storage.init_item(location, item)
|
|
# print("Created item {0} at {1}".format(item, location))
|
|
# return jsonify({'status': 'success'})
|
|
# except storage.AlreadyExistsError:
|
|
# return jsonify({'status': 'error', 'error': 'ITEM_ALREADY_EXISTS'}), status.HTTP_403_FORBIDDEN
|
|
|
|
|
|
# @app.route("/<location>/api/v1/new_location", methods = ["POST"])
|
|
# def api_new_location(location):
|
|
# try:
|
|
# storage.init_location(location)
|
|
# print("Created location {0}".format(location))
|
|
# return jsonify({'status': 'success'})
|
|
# except storage.AlreadyExistsError:
|
|
# return jsonify({'status': 'error', 'error': 'ITEM_ALREADY_EXISTS'}), status.HTTP_403_FORBIDDEN
|
|
|
|
|
|
# @app.route("/<location>/api/v1/item/<item>/<brand>/add_amount/<amount>", methods = ["POST"])
|
|
# def api_add_amount(location, item, brand, amount):
|
|
# storage.increment_amount(location, item, brand, amount)
|
|
# return jsonify({'status': 'success'})
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("Run with `flask` or a WSGI server!")
|