阅读:71
参考:
https://www.runoob.com/django/django-nginx-uwsgi.html
https://www.jianshu.com/p/6452596c4edb
# 安装flask
$ pip install flask
然后写一个简单的服务,我就写一个控制直播推流的开关
from flask import Flask
app = Flask(__name__)
global status
@app.route('/on')
def turn_on():
global status
status = 1
return 'living server is on!'
@app.route('/off')
def turn_off():
global status
status = 0
return 'living server is off!'
@app.route('/status')
def live_status():
global status
return str(status);
if __name__ == '__main__':
global status
status = 1 ;
app.run()
写好之后可以在本地跑起来试试,端口默认5000
然后安装uwsgi
$ pip install uwsgi
在/usr/local/live/uwsgi_script/路径下建立目录写配置文件uwsgi.ini
(我们新建文件夹uwsgi_script用来放uwsgi信息)
配置的时候也要带上开头的[uwsgi],一开始我没有注意,项目一直没有运行起来
[uwsgi]
socket = 127.0.0.1:7100
chdir = /usr/local/live
wsgi-file = servers.py
callable = app
processes = 4
vacuum = true
master = true
再写nginx配置
server {
listen 8050;
server_name data.migelab.com;
charset utf-8;
client_max_body_size 75M;
location / {
include uwsgi_params; # 导入uwsgi配置
uwsgi_pass 127.0.0.1:7100; # 转发端口,需要和uwsgi配置当中的监听端口一致
uwsgi_param UWSGI_PYTHON /usr/bin/python3;
uwsgi_param UWSGI_CHDIR /usr/local/live; # 项目根目录
uwsgi_param UWSGI_SCRIPT servers:app;
}
}
# 启动nginx
./nginx
./objs/nginx -c ./conf/nginx.conf
# 启动uwsgi
uwsgi --ini /usr/local/live/uwsgi_script/uwsgi.ini &
主要是升级两个内容:加入使用POST和GET方法;
返回 JSON对象的响应;
管理一个视频流升级为管理多个视频流
参考链接:https://flask.palletsprojects.com/en/1.1.x/api/#flask.Flask.response_class
https://blog.csdn.net/weixin_41665106/article/details/105599235
使用JSON格式去响应前端的请求:
@app.route('/on', methods=['POST','GET'])
def turn_on():
global status
if request.method =='POST':
camera = request.form['camera']
print(camera)
if camera != KeyError:
status[int(camera)] = 1
response = make_response({
'status':'success',
'code':'200',
'ERROR':''
},200)
response.headers={
'content-type':'JSON'
}
return response
else :
response = make_response({
'status': 'fail',
'code': '400',
'ERROR':'camera code error!'
},400)
response.headers = {
'content-type': 'JSON'
}
return response
else:
response = make_response({
'status': 'fail',
'code': '500',
'ERROR': 'GET method error!'
}, 500)
response.headers = {
'content-type': 'JSON'
}
return response
# 解决问题:如何构建响应
# 如何带参数
# 异常类
global InvalidUsage
class InvalidUsage(Exception):
status_code = 400
def __init__(self,message,status_code=400):
Exception.__init__(self)
self.message = message
self.status_code = status_code
@app.errorhandler(InvalidUsage)
def invalid_usage(error):
response = make_response(error.message)
response.status_code = error.status_code
return response
使用了抛出异常之后,之前的响应代码就简化了许多:
@app.route('/on', methods=['POST','GET'])
def turn_on():
global status
if request.method =='POST':
camera = request.form['camera']
print(camera)
if camera != KeyError:
status[int(camera)] = 1
response = make_response({
'status':'success',
'code':'200',
'ERROR':''
},200)
response.headers={
'content-type':'JSON'
}
return response
else :
raise InvalidUsage('camera code is inValid or empty', status_code=400)
else:
raise InvalidUsage('GET method is InValid',status_code = 403)
之后部署到服务器上提示无法识别global全局变量,又去查了一下flask全局变量的使用,然后改为使用session
又增添了一些新的功能:
from flask import Flask, url_for, request, render_template, make_response, session
from enum import Enum
app = Flask(__name__)
app.config['SECRET_KEY'] = '29348817403'
# 异常类
global InvalidUsage
class InvalidUsage(Exception):
status_code = 400
def __init__(self, message, status_code=400):
Exception.__init__(self)
self.message = message
self.status_code = status_code
@app.errorhandler(InvalidUsage)
def invalid_usage(error):
response = make_response(error.message)
response.status_code = error.status_code
return response
# 枚举所有的摄像头
class Camera(Enum):
SMARTHOME_FRONT = 1 # 智能家居正面
SMARTHOME_LEFT = 2 # 智能家居左侧
SMARTHOME_RIGHT = 3 # 智能家居右侧
SMARTHOME_BACK = 4 # 智能家居背侧
# 设置服务初始值
# 摄像头状态
# 摄像头url
@app.route('/', methods=['POST', 'GET'])
def init():
for i in range(Camera.__len__()):
session[str(i)] = 0
session['camera1'] = "https://live.test.tinylink.cn/live?port=1935&app=myapp&stream=test"
session['camera2'] = "https://live.test.tinylink.cn/live?port=1935&app=smarthome&stream=left"
session['user'] = "root"
session["experiment"] = "智能家居实验"
print("init is ok")
return "ok"
# 控制摄像头打开
# 参数:摄像头枚举代码
@app.route('/on', methods=['POST', 'GET'])
def turn_on():
camera = request.form['camera']
print(session)
if camera != KeyError:
session[camera] = 1
return "200"
else:
raise InvalidUsage('camera code is inValid or empty', status_code=400)
# 关闭摄像头
# 参数:摄像头枚举代码
@app.route('/off', methods=['POST', 'GET'])
def turn_off():
camera = request.form['camera']
print(camera)
if camera != KeyError:
session[camera] = 0
return "200"
else:
raise InvalidUsage('camera code is inValid or empty', status_code=400)
# 获取摄像头状态
# 参数:摄像头枚举代码
# 返回:0(关闭)1(运行)
@app.route('/status', methods=['POST', 'GET'])
def live_status():
camera = request.values.get('camera')
print(camera)
print(session)
if camera != KeyError:
return str(session.get(camera))
else:
raise InvalidUsage('camera code is inValid or empty', status_code=400)
# 获取可用摄像头总个数
@app.route('/available', methods=['POST', 'GET'])
def camera_number():
return str(Camera.__len__())
# 获取摄像头url
# 参数:摄像头枚举代码
# 返回:摄像头url
@app.route('/get/url', methods=['POST', 'GET'])
def camera_url():
camera = request.values.get('camera')
print(camera)
print(session)
if camera != KeyError:
print(session[camera])
print(session[camera] == '0')
url = "camera" + camera
if session.get(url) is None:
raise InvalidUsage('camera url is empty', status_code=400)
if session[camera] != 0:
url = "camera" + camera
return session.get(url)
else:
raise InvalidUsage('camera is off', status_code=400)
else:
raise InvalidUsage('camera code is inValid or empty', status_code=400)
# 获取用户名
# 返回用户名或抛出异常
@app.route('/get/user', methods=['POST', 'GET'])
def get_user():
if session.get("user") is None:
raise InvalidUsage('user is empty', status_code=400)
else:
return session['user']
# 设置用户名
# 参数:用户名字符串
@app.route('/set/user', methods=['POST', 'GET'])
def set_user():
user = request.values.get('user')
if user != KeyError:
session['user'] = user
return "200"
else:
raise InvalidUsage('user is inValid or empty', status_code=400)
# 获取实验名
# 返回实验名或抛出异常
@app.route('/get/experiment', methods=['POST', 'GET'])
def get_experiment():
if session.get("experiment") is None:
raise InvalidUsage('experiment is empty', status_code=400)
else:
return session['experiment']
# 设置实验名
# 参数:实验名字符串
@app.route('/set/experiment', methods=['POST', 'GET'])
def set_experiment():
experiment = request.values.get('experiment')
if experiment != KeyError:
session['experiment'] = experiment
return "200"
else:
raise InvalidUsage('experiment is inValid or empty', status_code=400)
if __name__ == '__main__':
app.run(port=7100)