Begin rewrite from psycopg2 to SQLAlchemy

This commit is contained in:
BBaoVanC 2020-10-04 21:12:09 -05:00
parent ec2974eef3
commit af9cdf850c
No known key found for this signature in database
GPG Key ID: 6D74C8B0E7D791C2
4 changed files with 111 additions and 103 deletions

6
.pylintrc Normal file
View File

@ -0,0 +1,6 @@
[TYPECHECK]
ignored-modules=flask_sqlalchemy
generated-members=session.*
[MESSAGES CONTROL]
disable=pointless-string-statement

55
libdb.py Normal file
View File

@ -0,0 +1,55 @@
#!/usr/bin/env python3
"""
libdb
Library for interacting with InvMan DB using SQLAlchemy
"""
from configparser import ConfigParser
from sqlalchemy import create_engine, Column, String, Text, BigInteger
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
def load_config():
"""Load PostgreSQL config from postgres.ini"""
filename = "postgres.ini"
section = "postgresql"
parser = ConfigParser()
parser.read(filename)
config = {}
if parser.has_section(section):
params = parser.items(section)
for param in params:
config[param[0]] = param[1]
else:
raise Exception('Section {0} not found in the {1} file'.format(section, filename))
return config
cfg = load_config()
engine = create_engine(f"postgresql://{cfg['user']}:{cfg['pass']}@{cfg['host']}/{cfg['database']}",
echo='debug')
Base = declarative_base()
Session = sessionmaker()
Session.configure(bind=engine)
class Location(Base):
"""location table"""
__tablename__ = 'location'
name = Column(String(length=32), primary_key=True)
description = Column(Text)
class ProductQuantity(Base):
"""product_quantity table"""
__tablename__ = 'product_quantity'
product_upc = Column(String(length=32), primary_key=True)
name = Column(Text)
quantity = Column(BigInteger)
location = Column(String(length=32), primary_key=True)

View File

@ -1,3 +1,4 @@
Flask
Flask_API
psycopg2-binary
sqlalchemy

152
webapi.py
View File

@ -6,114 +6,59 @@ Web API for InvMan
"""
# Imports
from flask import Flask, request, jsonify, abort, g
from flask import Flask, jsonify
from flask_api import status
import psycopg2
from psycopg2 import pool
from configparser import ConfigParser
from sqlalchemy.orm.exc import NoResultFound
from libdb import Session, Location, ProductQuantity
app = Flask(__name__) # app is the Flask app
filename = "postgres.ini"
section = "postgresql"
parser = ConfigParser()
parser.read(filename)
config = {}
if parser.has_section(section):
params = parser.items(section)
for param in params:
config[param[0]] = param[1]
else:
raise Exception('Section {0} not found in the {1} file'.format(section, filename))
app.config['postgreSQL_pool'] = psycopg2.pool.SimpleConnectionPool(1, 20,
user = config['user'],
password = config['password'],
host = config['host'],
port = config['port'],
database = config['database'])
def get_db():
if 'db' not in g:
g.db = app.config['postgreSQL_pool'].getconn()
return g.db
@app.teardown_appcontext
def close_conn(e):
db = g.pop('db', None)
if db is not None:
app.config['postgreSQL_pool'].putconn(db)
@app.route("/api/v1/locations", methods = ["GET"])
def get_locations():
db = get_db()
with db.cursor() as cur:
cur.execute("SELECT name,description FROM location")
data = cur.fetchall()
if len(data) <= 0:
print("no results returned, sending 404")
return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND
else:
print("ran SELECT name,description FROM location")
data2 = {}
for row in data:
data2[row[0]] = {}
data2[row[0]]['description'] = row[1]
# data2 = []
# for row in data:
# data2.append({'name': row[0], 'description': row[1]})
print("processed data:")
print(data2)
return jsonify(data2)
"""Route to get locations"""
session = Session()
data = {}
try:
for name,desc in session.query(Location.name, Location.description):
data[name] = {'description': desc}
except NoResultFound:
return jsonify({'error': 'NO_RESULT_FOUND'}), status.HTTP_404_NOT_FOUND
return jsonify(data)
@app.route("/api/v1/location/name/<search>", methods = ["GET"])
def api_get_location_information(search):
db = get_db()
with db.cursor() as cur:
cur.execute("SELECT name,description FROM location WHERE name = %s", (search,))
data = cur.fetchall()
print("ran SELECT name,description FROM location WHERE name = %s")
if len(data) <= 0:
print("no results returned, sending 404")
return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND
else:
data2 = {}
row = data[0]
data2['name'] = row[0]
data2['description'] = row[1]
print("processed data:")
print(data2)
return jsonify(data2)
"""Route to get information about a location"""
session = Session()
try:
data = session.query(Location.name, Location.description). \
filter(Location.name == search).one()
except NoResultFound:
return jsonify({'error': 'NO_RESULT_FOUND'}), status.HTTP_404_NOT_FOUND
data2 = {}
data2['name'] = data.name
data2['description'] = data.description
return jsonify(data2)
@app.route("/api/v1/location/<location>/quantities", methods = ["GET"])
def api_get_current_quantities(location):
db = get_db()
with db.cursor() as cur:
cur.execute("SELECT product_upc,quantity FROM product_quantity WHERE location = %s", (location,))
data = cur.fetchall()
print("ran SELECT product_upc,quantity FROM product_quantity WHERE location = %s")
if len(data) <= 0:
print("no results returned, sending 404")
return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND
else:
data2 = {}
for row in data:
data2[row[0]] = row[1]
print("processed data:")
print(data2)
return jsonify(data2)
"""Route to get quantities in a location"""
session = Session()
data = {}
try:
for upc,quantity in session.query(ProductQuantity.product_upc, ProductQuantity.quantity) \
.filter(ProductQuantity.location == location).all():
data[upc] = quantity
return jsonify(data)
except NoResultFound:
return jsonify({'error': 'NO_RESULT_FOUND'}), status.HTTP_404_NOT_FOUND
@app.route("/api/v1/location/<location>/quantity/<searchmethod>/<search>", methods = ["GET"])
""" @app.route("/api/v1/location/<location>/quantity/<searchmethod>/<search>", methods = ["GET"])
def api_get_quantity_of_product_in_location(location, searchmethod, search):
db = get_db()
with db.cursor() as cur:
@ -133,10 +78,10 @@ def api_get_quantity_of_product_in_location(location, searchmethod, search):
return jsonify(data2)
else:
print("invalid search method, sending 400")
return jsonify({'error': 'INVALID_SEARCH_METHOD'}), status.HTTP_400_BAD_REQUEST
return jsonify({'error': 'INVALID_SEARCH_METHOD'}), status.HTTP_400_BAD_REQUEST """
@app.route("/api/v1/products", methods = ["GET"])
""" @app.route("/api/v1/products", methods = ["GET"])
def api_get_products():
db = get_db()
with db.cursor() as cur:
@ -152,9 +97,10 @@ def api_get_products():
data2[row[0]] = {'brand': row[1], 'name': row[2], 'size': row[3], 'sizeunit': row[4], 'description': row[5]}
print("processed data:")
print(data2)
return jsonify(data2)
return jsonify(data2) """
@app.route("/api/v1/product/<searchmethod>/<search>", methods = ["GET"])
""" @app.route("/api/v1/product/<searchmethod>/<search>", methods = ["GET"])
def api_get_product_information(searchmethod, search):
db = get_db()
with db.cursor() as cur:
@ -181,10 +127,10 @@ def api_get_product_information(searchmethod, search):
return jsonify(data2)
else:
print("invalid search method, sending 400")
return jsonify({'error': 'INVALID_SEARCH_METHOD'}), status.HTTP_400_BAD_REQUEST
return jsonify({'error': 'INVALID_SEARCH_METHOD'}), status.HTTP_400_BAD_REQUEST """
@app.route("/api/v1/brands", methods = ["GET"])
""" @app.route("/api/v1/brands", methods = ["GET"])
def api_list_brands():
db = get_db()
with db.cursor() as cur:
@ -200,10 +146,10 @@ def api_list_brands():
data2[row[0]] = {'description': row[1]}
print("processed data:")
print(data2)
return jsonify(data2)
return jsonify(data2) """
@app.route("/api/v1/brand/name/<search>", methods = ["GET"])
""" @app.route("/api/v1/brand/name/<search>", methods = ["GET"])
def api_get_brand_by_name(search):
db = get_db()
with db.cursor() as cur:
@ -219,10 +165,10 @@ def api_get_brand_by_name(search):
data2[row[0]] = {'description': row[1]}
print("processed data:")
print(data2)
return jsonify(data2)
return jsonify(data2) """
@app.route("/api/v1/create_location", methods = ["POST"])
""" @app.route("/api/v1/create_location", methods = ["POST"])
def api_create_location():
db = get_db()
with db.cursor() as cur:
@ -240,10 +186,10 @@ def api_create_location():
print("ran INSERT INTO location (name, description) VALUES (%s, %s)")
db.commit()
print("committed changes to database")
return jsonify({'api_endpoint': '/api/v1/location/name/{0}'.format(locname)})
return jsonify({'api_endpoint': '/api/v1/location/name/{0}'.format(locname)}) """
@app.route("/api/v1/create_brand", methods = ["POST"])
""" @app.route("/api/v1/create_brand", methods = ["POST"])
def api_create_brand():
db = get_db()
with db.cursor() as cur:
@ -261,7 +207,7 @@ def api_create_brand():
print("ran INSERT INTO brand (name, description) VALUES (%s, %s)")
db.commit()
print("committed changes to database")
return jsonify({'api_endpoint': '/api/v1/brand/name/{0}'.format(locname)})
return jsonify({'api_endpoint': '/api/v1/brand/name/{0}'.format(locname)}) """
if __name__ == "__main__":