k8s python api二次封装 例子
发布时间:2020-12-20 10:42:29 所属栏目:Python 来源:网络整理
导读:k8s python api二次封装 pip install pprint kubernetes import urllib3from pprint import pprintfrom kubernetes import clientfrom os import pathimport yamlclass K8sApi(object): def __init__(self): # self.config = config.kube_config.load_kube_c
k8s python api二次封装
pip install pprint kubernetes import urllib3 from pprint import pprint from kubernetes import client from os import path import yaml class K8sApi(object): def __init__(self): # self.config = config.kube_config.load_kube_config() urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) self.configuration = client.Configuration() self.configuration.host = "https://192.168.100.111:6443" self.configuration.api_key[‘authorization‘] = ‘Bearer token‘ self.configuration.verify_ssl = False self.k8s_apps_v1 = client.AppsV1Api(client.ApiClient(self.configuration)) self.Api_Instance = client.CoreV1Api(client.ApiClient(self.configuration)) self.Api_Instance_Extensions = client.ExtensionsV1beta1Api(client.ApiClient(self.configuration)) #################################################################################################################### def list_deployment(self,namespace="default"): api_response = self.k8s_apps_v1.list_namespaced_deployment(namespace) return api_response def read_deployment(self,name="nginx-deployment",namespace="default"): api_response = self.k8s_apps_v1.read_namespaced_deployment(name,namespace) return api_response def create_deployment(self,file="deploy-nginx.yaml"): with open(path.join(path.dirname(__file__),file)) as f: dep = yaml.safe_load(f) resp = self.k8s_apps_v1.create_namespaced_deployment( body=dep,namespace="default") return resp def replace_deployment(self,file="deploy-nginx.yaml",namespace="default"): with open(path.join(path.dirname(__file__),file)) as f: dep = yaml.safe_load(f) resp = self.k8s_apps_v1.replace_namespaced_deployment(name,namespace,body=dep) return resp def delete_deployment(self,namespace="default"): api_response = self.k8s_apps_v1.delete_namespaced_deployment(name,namespace) return api_response #################################################################################################################### def list_node(self): api_response = self.Api_Instance.list_node() data = {} for i in api_response.items: data[i.metadata.name] = {"name": i.metadata.name,"status": i.status.conditions[-1].type if i.status.conditions[ -1].status == "True" else "NotReady","ip": i.status.addresses[0].address,"kubelet_version": i.status.node_info.kubelet_version,"os_image": i.status.node_info.os_image,} return data def list_pod(self): api_response = self.Api_Instance.list_pod_for_all_namespaces() data = {} for i in api_response.items: data[i.metadata.name] = {"ip": i.status.pod_ip,"namespace": i.metadata.namespace} return data def read_pod(self,name="nginx-pod",namespace="default"): api_response = self.Api_Instance.read_namespaced_pod(name,namespace) return api_response def create_pod(self,file="pod-nginx.yaml",file)) as f: dep = yaml.safe_load(f) api_response = self.Api_Instance.create_namespaced_pod(namespace,body=dep) return api_response def replace_pod(self,namespace="default"): dep = self.read_pod(name) api_response = self.Api_Instance.replace_namespaced_pod(name,body=dep) return api_response def delete_pod(self,namespace="default"): api_response = self.Api_Instance.delete_namespaced_pod(name,namespace) return api_response #################################################################################################################### def list_service(self): api_response = self.Api_Instance.list_service_for_all_namespaces() return api_response def read_service(self,name="",namespace="default"): api_response = self.Api_Instance.read_namespaced_service(name,namespace) return api_response def create_service(self,file="service-nginx.yaml",file)) as f: dep = yaml.safe_load(f) api_response = self.Api_Instance.create_namespaced_service(namespace,body=dep) return api_response def replace_service(self,name="hequan",namespace="default"): dep = self.read_pod(name) api_response = self.Api_Instance.replace_namespaced_service(name,body=dep) return api_response def delete_service(self,namespace="default"): api_response = self.Api_Instance.delete_namespaced_service(name,namespace) return api_response #################################################################################################################### def list_ingress(self): api_response = self.Api_Instance_Extensions.list_ingress_for_all_namespaces() return api_response def read_ingress(self,namespace="default"): api_response = self.Api_Instance_Extensions.read_namespaced_ingress(name,namespace) return api_response def create_ingress(self,file="ingress-nginx.yaml",file)) as f: dep = yaml.safe_load(f) api_response = self.Api_Instance_Extensions.create_namespaced_ingress(namespace,body=dep) return api_response def replace_ingress(self,file)) as f: dep = yaml.safe_load(f) api_response = self.Api_Instance_Extensions.replace_namespaced_ingress(name=name,namespace=namespace,body=dep) return api_response def delete_ingress(self,namespace="default"): api_response = self.Api_Instance_Extensions.delete_namespaced_ingress(name,namespace) return api_response ##################################################################################################################### def list_stateful(self): api_response = self.k8s_apps_v1.list_stateful_set_for_all_namespaces() return api_response def read_stateful(self,namespace="default"): api_response = self.k8s_apps_v1.read_namespaced_stateful_set(name,namespace) return api_response def create_stateful(self,file)) as f: dep = yaml.safe_load(f) resp = self.k8s_apps_v1.create_namespaced_stateful_set( body=dep,namespace="default") return resp def replace_stateful(self,file)) as f: dep = yaml.safe_load(f) resp = self.k8s_apps_v1.replace_namespaced_stateful_set(name,body=dep) return resp def delete_stateful(self,namespace="default"): api_response = self.k8s_apps_v1.delete_namespaced_stateful_set(name,namespace) return api_response #################################################################################################################### if __name__ == ‘__main__‘: def test(): obj = K8sApi() ret = obj.list_deployment() pprint(ret) test() (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |