#!/usr/bin/env python3 """ InvMan Web API Web API for InvMan """ # Imports from flask import Flask, request, jsonify, abort, g from flask_api import status import psycopg2 from psycopg2 import pool from configparser import ConfigParser app = Flask(__name__) # app is the Flask app filename = "postgres.ini" section = "postgresql" parser = ConfigParser() parser.read(filename) config = {} if parser.has_section(section): params = parser.items(section) for param in params: config[param[0]] = param[1] else: raise Exception('Section {0} not found in the {1} file'.format(section, filename)) app.config['postgreSQL_pool'] = psycopg2.pool.SimpleConnectionPool(1, 20, user = config['user'], password = config['password'], host = config['host'], port = config['port'], database = config['database']) def get_db(): if 'db' not in g: g.db = app.config['postgreSQL_pool'].getconn() return g.db @app.teardown_appcontext def close_conn(e): db = g.pop('db', None) if db is not None: app.config['postgreSQL_pool'].putconn(db) @app.route("/api/v1/locations", methods = ["GET"]) def get_locations(): db = get_db() with db.cursor() as cur: cur.execute("SELECT name,description FROM location") data = cur.fetchall() if len(data) <= 0: print("no results returned, sending 404") return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND else: print("ran SELECT name,description FROM location") data2 = {} for row in data: data2[row[0]] = {} data2[row[0]]['description'] = row[1] # data2 = [] # for row in data: # data2.append({'name': row[0], 'description': row[1]}) print("processed data:") print(data2) return jsonify(data2) @app.route("/api/v1/location/name/", methods = ["GET"]) def api_get_location_information(search): db = get_db() with db.cursor() as cur: cur.execute("SELECT name,description FROM location WHERE name = %s", (search,)) data = cur.fetchall() print("ran SELECT name,description FROM location WHERE name = %s") if len(data) <= 0: print("no results returned, sending 404") return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND else: data2 = {} row = data[0] data2['name'] = row[0] data2['description'] = row[1] print("processed data:") print(data2) return jsonify(data2) @app.route("/api/v1/location//quantities", methods = ["GET"]) def api_get_current_quantities(location): db = get_db() with db.cursor() as cur: cur.execute("SELECT product_upc,quantity FROM product_quantity WHERE location = %s", (location,)) data = cur.fetchall() print("ran SELECT product_upc,quantity FROM product_quantity WHERE location = %s") if len(data) <= 0: print("no results returned, sending 404") return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND else: data2 = {} for row in data: data2[row[0]] = row[1] print("processed data:") print(data2) return jsonify(data2) @app.route("/api/v1/location//quantity//", methods = ["GET"]) def api_get_quantity_of_product_in_location(location, searchmethod, search): db = get_db() with db.cursor() as cur: methodmap = {'upc': 'product_upc', 'name': 'name'} if searchmethod in methodmap.keys(): method = methodmap[searchmethod] cur.execute("SELECT quantity FROM product_quantity WHERE location = %s AND {0} = %s".format(method), (location, search)) data = cur.fetchall() print("ran SELECT quantity FROM product_quantity WHERE location = %s AND {0} = %s".format(method)) if len(data) <= 0: print("no results returned, sending 404") return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND else: data2 = data[0][0] print("processed data:") print(data2) return jsonify(data2) else: print("invalid search method, sending 400") return jsonify({'error': 'INVALID_SEARCH_METHOD'}), status.HTTP_400_BAD_REQUEST @app.route("/api/v1/products", methods = ["GET"]) def api_get_products(): db = get_db() with db.cursor() as cur: cur.execute("SELECT upc,brand,name,size,sizeunit,description FROM product") data = cur.fetchall() print("ran SELECT upc,brand,name,size,sizeunit,description FROM product") if len(data) <= 0: print("no results returned, sending 404") return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND else: data2 = {} for row in data: data2[row[0]] = {'brand': row[1], 'name': row[2], 'size': row[3], 'sizeunit': row[4], 'description': row[5]} print("processed data:") print(data2) return jsonify(data2) @app.route("/api/v1/product//", methods = ["GET"]) def api_get_product_information(searchmethod, search): db = get_db() with db.cursor() as cur: if searchmethod in ['name', 'upc']: cur.execute("SELECT upc,brand,name,size,sizeunit,description FROM product WHERE {0} = %s".format(searchmethod), (search,)) data = cur.fetchall() print("ran SELECT upc,brand,name,size,sizeunit,description FROM product WHERE {0} = %s") if len(data) <= 0: print("no results returned, sending 404") return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND else: row = data[0] data2 = {} data2['upc'] = row[0] data2['brand'] = row[1] data2['name'] = row[2] data2['size'] = row[3] data2['sizeunit'] = row[4] data2['description'] = row[5] print("processed data:") print(data2) return jsonify(data2) else: print("invalid search method, sending 400") return jsonify({'error': 'INVALID_SEARCH_METHOD'}), status.HTTP_400_BAD_REQUEST @app.route("/api/v1/brands", methods = ["GET"]) def api_list_brands(): db = get_db() with db.cursor() as cur: cur.execute("SELECT name,description FROM brand") data = cur.fetchall() print("ran SELECT name,description FROM brand") if len(data) <= 0: print("no results returned, sending 404") return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND else: data2 = {} for row in data: data2[row[0]] = {'description': row[1]} print("processed data:") print(data2) return jsonify(data2) @app.route("/api/v1/brand/name/", methods = ["GET"]) def api_get_brand_by_name(search): db = get_db() with db.cursor() as cur: cur.execute("SELECT name,description FROM brand WHERE name = %s", (search,)) data = cur.fetchall() print("ran SELECT name,description FROM brand WHERE name = %s") if len(data) <= 0: print("no results returned, sending 404") return jsonify({'error': 'NO_RESULTS_RETURNED'}), status.HTTP_404_NOT_FOUND else: data2 = {} for row in data: data2[row[0]] = {'description': row[1]} print("processed data:") print(data2) return jsonify(data2) @app.route("/api/v1/create_location", methods = ["POST"]) def api_create_location(): db = get_db() with db.cursor() as cur: if "name" not in request.form: return jsonify({'error': 'NO_NAME_PROVIDED'}), status.HTTP_400_BAD_REQUEST else: locname = request.form['name'] if "description" in request.form: locdesc = request.form['description'] else: locdesc = None print(locname, locdesc) cur.execute("INSERT INTO location (name, description) VALUES (%s, %s)", (locname, locdesc)) print("ran INSERT INTO location (name, description) VALUES (%s, %s)") db.commit() print("committed changes to database") return jsonify({'api_endpoint': '/api/v1/location/name/{0}'.format(locname)}) @app.route("/api/v1/create_brand", methods = ["POST"]) def api_create_brand(): db = get_db() with db.cursor() as cur: if "name" not in request.form: return jsonify({'error': 'NO_NAME_PROVIDED'}), status.HTTP_400_BAD_REQUEST else: locname = request.form['name'] if "description" in request.form: locdesc = request.form['description'] else: locdesc = None print(f"creating brand with name {locname} and description {locdesc}") cur.execute("INSERT INTO brand (name, description) VALUES (%s, %s)", (locname, locdesc)) print("ran INSERT INTO brand (name, description) VALUES (%s, %s)") db.commit() print("committed changes to database") return jsonify({'api_endpoint': '/api/v1/brand/name/{0}'.format(locname)}) if __name__ == "__main__": print("Run with `flask` or a WSGI server!")