三、私有云运维开发(15)
使用自动化运维工具 Ansible 完成系统的自动化部署与管理。
基于 OpenStack APIs 与SDK,开发私有云运维程序
1.OpenStack Python运维开发:实现镜像管理(7分)
编写Python代码,实现OpenStack镜像增删查改。
在controller节点的/root目录下创建create_image.py文件,编写python代码对接OpenStack API,完成镜像的上传与查询。
①创建镜像:要求在OpenStack私有云平台中上传镜像cirros-0.3.4-x86_64-disk.img,名字为pvm_image,disk_format为qcow2,container_format为bare。
②查询镜像:查询pvm_image的详细信息,并控制台输出。
# 编写Python代码,实现OpenStack镜像增查
vi create_image.py
# encoding:utf-8
import requests,json,time
def get_auth_token(controller_ip,domain,name,password):try:url = f"http://{controller_ip}:5000/v3/auth/tokens"body = {"auth": {"identity": {"methods": ['password'],"password": {"user": {"domain": {"name": domain},"name": name,"password": password,}}},"scope": {"project": {"domain": {"name": domain},"name": name}}}}headers = {"Content-Type": "application/json"}token = requests.post(url,headers=headers,data=json.dumps(body)).headers['X-Subject-Token']headers = {"X-Auth-Token": token}print(f"token值为:{token}")return headersexcept Exception as e:print(f"token获取失败,{e}")class image_manager:def __init__(self,handers:dict,resUrl):self.headers = handersself.resUrl = resUrldef create_image(self,image_name,disk_format,container_format):body = {"name": image_name,"disk_format": disk_format,"container_format": container_format,}req = requests.post(self.resUrl,headers=self.headers,data=json.dumps(body)).textprint(f"创建镜像的信息为:{req}")return reqdef get_image_id(self,name):req = json.loads(requests.get(self.resUrl,headers=self.headers).text)for image in req['images']:if image['name'] == name:return image['id']return "NONE"def upload_image(self,id,file_path:str):url = self.resUrl + "/" + id + "/file"self.headers["Content-Type"] = "application/octet-stream"req = requests.put(url,headers=self.headers,data=open(file_path,'rb').read())if req.status_code == 204:print("上传镜像成功",req.status_code)else:print("上传镜像失败",req.status_code)print(f"镜像上传信息:{req}")return reqdef get_image(self,id):url = self.resUrl + "/" + idreq = json.loads(requests.get(self.resUrl,headers=self.headers).text)print(f"获取到的镜像信息为:{req}")return reqdef delete_image(self,id):url = self.resUrl + "/" + idreq = requests.delete(url,headers=self.headers)print(f"删除信息:{req}")return reqif __name__ == "__main__":controller_ip = "10.26.16.133"domain = "demo"name = "admin"password = "000000"headers = get_auth_token(controller_ip, domain, name, password)image_m = image_manager(headers,f"http://{controller_ip}:9292/v2/images")#createcreate_image = image_m.create_image("cirros001","qcow2","bare")#get idget_id = image_m.get_image_id("cirros001")print(f"cirros001镜像ID为:{get_id}")#uploadupload_image = image_m.upload_image(get_id,"cirros-0.3.4-x86_64-disk.img")#get imageget_image = image_m.get_image(get_id)with open("image_demo.json","w")as outfile:json.dump(get_image,outfile,indent=4)
2.OpenStack用户管理服务接口开发(8分)
使用已建好的OpenStack Python运维开发环境,在/root目录下创建user_manager.py脚本,编写Python代码,端口为5043,IP地址为0.0.0.0,开发出OpenStack用户管理的接口,需要实现的接口如下:
①GET /user/,自行调用查询接口,查询指定名称的用户;返回信息以json格式输出到控制台。
②POST /user/create,自行调用创建接口,创建名为chinaskill的用户,密码为123456,返回信息以json格式输出到控制台。
③DELETE /user/delete/,自行调用删除接口,删除指定名称的用户,若删除成功,返回信息输出到控制台。
# 编写api_user_manager.py
vi api_user_manager.py
# encoding:utf-8
import requests, json, time
import logging
# -----------logger-----------
# get logger
logger = logging.getLogger(__name__)
# level
logger.setLevel(logging.DEBUG)
# format
format = logging.Formatter('%(asctime)s %(message)s')
# to console
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(format)
logger.addHandler(stream_handler)
# -----------logger-----------
def get_auth_token(controller_ip, domain, user, password):''':param controller_ip: openstack master ip address:param domain: current user's domain:param user: user name:param password: user password:return: keystoen auth Token for current user.'''try:url = "http://controller:5000/v3/auth/tokens"body = {"auth": {"identity": {"methods": ["password"],"password": {"user": {"domain": {"name": domain},"name": user,"password": password}}},"scope": {"project": {"domain": {"name": domain},"name": user}}}}headers = {"Content-Type": "application/json",}print(body)Token = requests.post(url, data=json.dumps(body), headers=headers).headers['X-Subject-Token']headers = {"X-Auth-Token": Token}logger.debug(f"获取Token值:{str(Token)}")return headersexcept Exception as e:logger.error(f"获取Token值失败,请检查访问云主机控制节点IP是否正确?输出错误信息如下:{str(e)}")exit(0)
# 用户管理
# https://docs.openstack.org/api-ref/identity/v3/index.html#users
class user_manager:def __init__(self, handers: dict, resUrl: str):self.headers = handersself.resUrl = resUrldef create_users(self, user_name, password: str, desc: str):"""create a user with name and password and description."""body = {"user": {"name": user_name,"password": password,"description": desc,}}status_code = requests.post(self.resUrl, data=json.dumps(body), headers=self.headers).textlogger.debug(f"返回状态:{str(status_code)}")return status_codedef get_users(self):"""get user"""status_code = requests.get(self.resUrl, headers=self.headers).textlogger.debug(f"返回状态:{str(status_code)}")return status_codedef get_user_id(self, user_name):"""get user id by name."""result = json.loads(requests.get(self.resUrl, headers=self.headers).text)user_name = user_namefor item in result['users']:if item['name'] == user_name:return item['id']return "NONE"def get_user(self, id: str):"""get a flavor by id."""api_url = self.resUrl + "/" + idresult = json.loads(requests.get(api_url, headers=self.headers).text)logger.debug(f"返回信息:{str(result)}")return resultdef delete_user(self, name: str):"""delete a user by id."""id = self.get_user_id(name)api_url = self.resUrl + "/" + idresponse = requests.delete(api_url, headers=self.headers)if response.status_code == 204:return {"User itemDeletedSuccess": response.status_code}result = json.loads(response.text)logger.debug(f"返回信息:{str(result)}")return resultdef update_User_password(self, id: str, original_password: str, new_password: str):"""update a flavor desc by id."""self.headers['Content-Type'] = "application/json"body = {"user": {"password": new_password,"original_password": original_password}}api_url = self.resUrl + "/" + id + "/password"response = requests.post(api_url, data=json.dumps(body), headers=self.headers)# Normal response codes: 204 without return textif response.status_code == 204:return {"item Update Password Success": response.status_code}result = json.loads(response.text)logger.debug(f"返回信息:{str(result)}")return result
if __name__ == '__main__':# 1. openstack allinone (controller ) credentials# host ip address# controller_ip = "10.24.2.22"controller_ip = "controller"# controller_ip = "10.24.2.22"# domain namedomain = "demo"# user nameuser = "admin"# user passwordpassword = "000000"headers = get_auth_token(controller_ip, domain, user, password)print("headers:", headers)# get all useruser_m = user_manager(headers, "http://controller:5000/v3/users")# 1 查询所有users = user_m.get_users()print("查询所有users:", users)# 编写user_manager.py
vi user_manager.py
#encoding:utf-8
import argparse
import api_user_manager
import json
import csv
import yaml
controller_ip = "controller"
domain = "demo"
user = "admin"
password = "000000"
headers = api_user_manager.get_auth_token(controller_ip, domain, user, password)
print("headers:", headers)
user_m = api_user_manager.user_manager(headers, "http://controller:5000/v3/users")
print("-----------begin-----------------")
def define_args(parser):"""定义程序支持的args:return:"""# parser = argparse.ArgumentParser()#增加控制命令(postion 位置参数,必须)parser.add_argument('command',help='Resource command name',type=str)# parser.add_argument('delete',# help='delete a resource',# type=str)#可选参数(可有可无)parser.add_argument('-n', '--name', # 可选参数,删除的名称help='The Name of the resource', # 输入-h展示type=str)parser.add_argument('-i', '--input', # 可选参数,删除的名称help='The input json format text ', # 输入-h展示type=str)parser.add_argument('-o', '--output', # 可选参数,删除的名称help='The output file path ', # 输入-h展示type=str)
def parse_args(parser):args = parser.parse_args()if args.command:if args.command == "create":print("create some thing")create_user(args)elif args.command == "getall":print("getall some thing")getall_users(args)elif args.command == "get":print("get some thing")get_user(args)elif args.command == "delete":print("delete some thing")delete_user(args)else:print("Note support command name!")
def create_user(args):print('Provided command value is %r.' % args.command)print('Provided input value is %r.' % args.input)print('Provided output value is %r.' % args.output)output_file = args.output# user_name, password: str, desc: str):user_dict = json.loads(args.input)result = user_m.create_users(user_dict["name"],user_dict["password"],user_dict["description"])# 写出json文件print("--------write to json---------:", result)print(result)
def delete_user(args):print('Provided command value is %r.' % args.command)print('Provided input value is %r.' % args.input)print('Provided output value is %r.' % args.output)result = user_m.delete_user(args.name)print(result)
def getall_users(args):print('Provided command value is %r.' % args.command)print('Provided input value is %r.' % args.input)print('Provided output value is %r.' % args.output)print(type(args.input))result = user_m.get_users()output_file = args.output# 写出json文件print("--------result---------")print(result)configuration = json.loads(result)# 写出yaml (dict)with open(output_file, 'w') as yaml_file:yaml.dump(configuration, yaml_file)print(result)
def get_user(args):print('Provided command value is %r.' % args.command)print('Provided input value is %r.' % args.input)print('Provided output value is %r.' % args.output)id = user_m.get_user_id(args.name)result = user_m.get_user(id)output_file = args.output# 写出json文件with open(output_file, 'w') as jsonfile:json.dump(result, jsonfile, indent=4)print(result)
if __name__ == '__main__':import sysprint(sys.argv)parser = argparse.ArgumentParser()define_args(parser)
parse_args(parser)# 创建1个用户
python3 user_manager.py create --input '{ "name": "user01", "password": "000000", "description": "description" } '# 查询给定具体名称的用户查询
python3 user_manager.py get --name user01 -o user.json# 查询目前admin账号下所有的用户
python3 user_manager.py getall -o openstack_all_user.yaml# 删除指定的名称的用户
python3 user_manager.py delete --name user01