文章目錄
- 一、引入fabric包
- 二、認證
- 1、使用config文件認證
- 2、使用oauthtoken認證
- 三、pod的查詢和遍歷
- 四、命名空間的創建和刪除
- 五、deployment的創建和刪除
- 部分參數說明
- 1、resourceRequirements
- 2、containerPorts
- 3、envVarList
- 4、volumeMounts和volumeList
- 5、nodeAffinity
- 六、單個pod的創建和刪除
- 七、DaemonSet的創建
- 七、給node打標簽
一、引入fabric包
<dependency><groupId>io.fabric8</groupId><artifactId>kubernetes-client</artifactId><version>5.10.2</version>
</dependency>
二、認證
認證十分簡單,只要拿到認證的config信息后使用以下方式即可。
KubernetesClient client = new DefaultKubernetesClient(config);
當然了,為了方便使用,client最好是存入數據庫后再放在緩存中去維護。且需要對接多個k8s集群時,需要多個KubernetesClient,因此最好是在緩存中維護一個集群編碼和client的對應關系。
那么認證的config信息要怎么拿到呢?通常有使用config文件和oauthtoken認證兩種方式。
當然了,在初始時可以對這個client進行校驗,測試連通性是否有問題,如果校驗通過再在后面對它進行操作。
try {NonNamespaceOperation<Namespace, NamespaceList, Resource<Namespace>> namespaces = kubernetesClient.namespaces();if (namespaces != null) {namespaces.withName("default").get();}
} catch (Exception e) {throw new XxxException(Xxxxxx);
} finally {kubernetesClient.close();
}
1、使用config文件認證
config文件在管理節點的/root/.kube/目錄下面,在頁面上傳后,我們后端拿到的比如說是一個fileUploadReqBO下的byte[] file;
String kubeConfig = null;
Config config = null;
try {kubeConfig = new String(fileUploadReqBO.getFile(), Charsets.UTF_8.toString());config = Config.fromKubeconfig(kubeConfig);
} catch (Exception e) {throw new XxxException(xxxx, e.getMessage());
}
2、使用oauthtoken認證
獲取oauthtoken需要有admin權限的serviceaccount,如果沒有的話那么就自己手動創建一個。
創建serviceaccount賬戶,這里我們就叫test-admin:
kubectl create serviceaccount test-admin -n kube-system
給予admin權限:
kubectl create clusterrolebinding my-service-account-admin --clusterrole=cluster-admin --serviceaccount=kube-system:test-admin
執行以下命令
kubectl get secret -n kube-system|grep admin
找到返回結果中以test-admin-token-開頭的內容,使用以下命令
kubectl describe secret test-admin-token-XXX -n kube-system
就可以獲取到token了
獲取到后可使用kubectl auth can-i create deployments --as=system:serviceaccount:kube-system:test-admin --token=
判斷是否有管理員權限 yes有 no沒有
那么這時假設我們能拿到一個masterUrl,例如 https://10.20.66.152:6443(kube-apiserver一般來說默認端口為6443)以及token。
就可以這樣獲取到config:
Config config = new ConfigBuilder().withTrustCerts(true).build();
config.setMasterUrl(masterUrl);
config.setOauthToken(oauthToken);
三、pod的查詢和遍歷
查詢所有pod:
//已獲取KubernetesClient:KubernetesClient client = new DefaultKubernetesClient(config);
PodList podList = client.pods().list();
根據命名空間查詢:
PodList podList = client.pods().inNamespace(K8sGenericConstant.K8S_NAMESPACE_ENGINE_SERVER).list();
遍歷pod:
if (podList != null && podList.getItems() != null) {for(Pod pod : podList.getItems()){//pod名稱String podName = pod.getMetadata().getName();//pod所在節點名稱String nodeName = pod.getSpec().getNodeName();//pod標簽Map<String, String> labels = pod.getMetadata().getLabels();//命名空間String ns = pod.getMetadata().getNamespace();//狀態pod.getStatus().getContainerStatuses();pod.getStatus().getReason();List<PodCondition> podConditions = pod.getStatus().getConditions();if (!CollectionUtils.isEmpty(podConditions)) {PodCondition podCondition = podConditions.get(0);reason = podCondition.getReason() + ":" + podCondition.getMessage();}}
四、命名空間的創建和刪除
創建
NonNamespaceOperation<Namespace, NamespaceList, Resource<Namespace>> namespaces = client.namespaces();
if (namespaces == null) {return null;
}
String name = "test-ns";
Map<String, String> labels = Maps.newHashMap();
labels.put("testlabel", "testvalue");
Namespace ns = new NamespaceBuilder().withNewMetadata().withName(name).addToLabels(labels).endMetadata().build();
ns = namespaces.createOrReplace(ns);
刪除
NonNamespaceOperation<Namespace, NamespaceList, Resource<Namespace>> namespaces = client.namespaces();
if (namespaces == null) {return null;
}
namespaces.withName(name).delete();
五、deployment的創建和刪除
刪除:
//注意這里deployment需要先查詢出Deployment類型,而不只是名稱
client.apps().deployments().inNamespace(namespace).delete(deployment);
client.apps().deployments().inNamespace(namespace).withName(deploymentname).delete();
創建:
Deployment deployment = new DeploymentBuilder().withNewMetadata().withName(podName).endMetadata().withNewSpec().withNewSelector().addToMatchLabels(matchLabels).endSelector().withReplicas(1).withNewTemplate().withNewMetadata().withLabels(matchLabels).withNamespace(namespace).withAnnotations(annotations).endMetadata().withNewSpec().addNewContainer().withName(podName).withImage(imageUrl).withImagePullPolicy(K8sImagePullPolicyEnum.IF_NOT_PRESENT.getValue()).withResources(resourceRequirements).withPorts(containerPorts).withEnv(envVarList).withVolumeMounts(volumeMounts).withCommand(commandList).withArgs(argList).endContainer().withVolumes(volumeList).withNewAffinity().withNodeAffinity(nodeAffinity).endAffinity().withNodeSelector(nodeSelector).endSpec().endTemplate().endSpec().build();client.apps().deployments().inNamespace(namespace).create(deployment);
部分參數說明
其中的參數比如podName、namespace、podName、imageUrl是String類型,commandList、argList為List<String>類型,但也有不少需要提前構造的參數,比如matchLabels、annotations、nodeSelector是Map<String,String>的類型,又比如以下幾個示例:
1、resourceRequirements
ResourceRequirements resourceRequirements = new ResourceRequirements();
Map<String, Quantity> limits = new HashMap<>();
limits.put("cpu", new Quantity("2000m"));
limits.put("memory", new Quantity("20480Mi"));
limits.put("nvidia.kubernetes.io/gpu", new Quantity("1"));
Map<String, Quantity> requests = new HashMap<>();
requests.put("cpu", new Quantity("1000m"));
requests.put("memory", new Quantity("10240Mi"));
requests.put("nvidia.kubernetes.io/gpu", new Quantity("1"));
resourceRequirements.setRequests(requests);
resourceRequirements.setLimits(limits);
注意這里的limits.put()后面的key要和describe node獲取的一致。比如這里的gpu用的是nvidia.kubernetes.io/gpu,如果是其他廠商的或者映射出來的不一致,則要和環境中保持一致。實際使用中經常做成可配置/傳參的,由于這里只是一個示例,因此寫死了。
Capacity:cpu: 8ephemeral-storage: 308468608Kihugepages-1Gi: 0hugepages-2Mi: 0memory: 32771060Kinvidia.kubernetes.io/gpu: 1pods: 200
2、containerPorts
containerPorts需要的類型是List<ContainerPort>
也就是如下圖所示:
public synchronized List<ContainerPort> buildContainerPorts() {LOGGER.info("ports={}", ports);List<ContainerPort> containerPortList = Lists.newArrayList();//實際使用時需作為入參傳入List<ContainerPortBO>,這里作為示范直接寫死ContainerPort port = new ContainerPort();port.setHostPort(32111);port.setName("web-port");port.setProtocol("TCP");port.setContainerPort(32111);containerPortList.add(port);//假設這里我們已經獲得了一個containerPortListcontainerPortList = containerPortList.stream().filter(p -> p.getHostPort() != null && p.getContainerPort() != null).collect(Collectors.toList());if (CollectionUtils.isEmpty(containerPortList)) {return null;}// 如果由上層直接指定端口的話,這里直接return containerPortList即可//但當需要我們自己去分配端口時 需要盡量避免端口沖突,因此做了以下處理(并不完全能避免,但至少如果某個節點跑多個pod,不會只能跑一個其他的都在pending)// 1.查詢每個POD占用的端口PodList podList = K8sClientTool.getKubernetesClient().pods().list();Set<Integer> excludeNodePortList = Sets.newHashSet();if (podList != null && podList.getItems() != null) {for (Pod pod : podList.getItems()) {List<Integer> portList = pod.getSpec().getContainers().stream().flatMap(m ->m.getPorts().stream().filter(p -> p.getHostPort() != null).map(ContainerPort::getHostPort)).collect(Collectors.toList());excludeNodePortList.addAll(portList);}}// 2.獲取組件安裝機器的端口,一般aid安裝在K8S集群的主節點上,這樣可以規避掉主要的端口try {String result = SshTool.doExecute("netstat -nlpt | grep -Po '\\d+(?=.+)' | sort -rn | xargs -n1");if (StringUtils.isNotEmpty(result)) {excludeNodePortList.addAll(Arrays.stream(result.split("\n")).map(s -> Integer.parseInt(s.trim())).collect(Collectors.toList()));}} catch (Exception e) {throw new ComputeResourceException(AidServerErrorCode.ERR_DEVICE_SSH_CONNECT);}// 3.解決容器端口的占用和沖突問題,這里需要解決并發的問題,加一個鎖來處理List<Pair<Integer, Long>> needRemovePortPairList = Lists.newArrayList();// 4.先加入配置文件中要排除的端口excludeNodePortList.addAll(Arrays.stream(excludeNodePorts.split(",")).map(s -> Integer.parseInt(s.trim())).collect(Collectors.toList()));// 5.再加入歷史分配出去的端口,這些端口有可能沒有真正的分配出去,但是需要緩存,避免同時出現2個要分配的端口excludeNodePortList.addAll(excludeNodePortPairList.stream().map(pair -> {if (pair.getRight() < (System.currentTimeMillis() - DEFAULT_TIME_TO_LIVE)) {return pair.getLeft();}needRemovePortPairList.add(pair);return null;}).filter(p -> p != null).collect(Collectors.toSet()));// 6.清理掉過期的緩存端口excludeNodePortPairList.removeAll(needRemovePortPairList);LOGGER.info("containerPortList={}, excludeNodePortList={}", containerPortList, excludeNodePortList);containerPortList.stream().forEach(c -> {// 優先使用分配的hostPort,不滿足再隨機分配Integer hostPort = c.getHostPort();while (excludeNodePortList.contains(hostPort)) {hostPort = RandomUtils.nextInt(minNodePort, maxNodePort);}excludeNodePortList.add(hostPort);excludeNodePortPairList.add(Pair.of(hostPort, System.currentTimeMillis()));if (StringUtils.isNotEmpty(c.getName())) {c.setName(c.getName().toLowerCase().replaceAll("_", "-"));if (c.getName().length() > 15) {c.setName(c.getName().substring(0, 15));}}c.setHostPort(hostPort);});LOGGER.info("containerPortList={}", containerPortList);return containerPortList;
}
3、envVarList
List<EnvVar> envVarList = Lists.newArrayList();
EnvVar envVar = new EnvVar();
envVar.setName("TEST_ENV_KEY");
envVar.setValue("TEST_ENV_VALUE");
envVarList.add(envVar);
4、volumeMounts和volumeList
假設參數以List<Map<String, String>>形式傳入,例如:
“volumeMounts”:[{“name”:“test-name”,“mountPath”:“/home/test”,“hostPath”:“/home/test”}]
volumeMounts:
public List<VolumeMount> buildVolumeMounts(List<Map<String, String>> volumeMountMapList) {List<VolumeMount> volumeMounts = Lists.newArrayList();if (!CollectionUtils.isEmpty(volumeMountMapList)) {for (Map<String, String> map : volumeMountMapList) {volumeMounts.add(TypeTool.castToBean(map, VolumeMount.class));}}
// VolumeMount testVolumeMount = new VolumeMount();
// testVolumeMount.setName("test-name");
// testVolumeMount.setMountPath("/home/test");
// volumeMounts.add(testVolumeMount); volumeMounts.add(testVolumeMount);return volumeMounts;
}
volumeList:
public List<Volume> buildVolumes(List<VolumeMount> volumeMounts, List<Map<String, String>> volumeMountMapList) {return volumeMounts.stream().map(m -> {Volume volume = new Volume();volume.setName(m.getName());String path = m.getMountPath();if (!CollectionUtils.isEmpty(volumeMountMapList)) {Optional<Map<String, String>> optional = volumeMountMapList.stream().filter(p -> m.getName().equals(p.get("name"))).findFirst();if (optional.isPresent()) {Map<String, String> volumeMap = optional.get();if (volumeMap.containsKey("hostPath")) {path = optional.get().get("hostPath");}}}HostPathVolumeSource hostPath = new HostPathVolumeSource();hostPath.setPath(path);volume.setHostPath(hostPath);return volume;}).collect(Collectors.toList());
}
5、nodeAffinity
List<NodeSelectorRequirement> matchExpressions = Lists.newArrayList();matchExpressions.add(new NodeSelectorRequirementBuilder().withKey("nvidia.kubernetes.io/gpu")//GpuTypeEnum.toContainerValues():List<String>.withOperator("In").withValues(GpuTypeEnum.toContainerValues()).build());NodeAffinity nodeAffinity = new NodeAffinityBuilder().withNewRequiredDuringSchedulingIgnoredDuringExecution().withNodeSelectorTerms(new NodeSelectorTermBuilder().withMatchExpressions(matchExpressions).build()).endRequiredDuringSchedulingIgnoredDuringExecution().build();
六、單個pod的創建和刪除
刪除:
client.pods().inNamespace(namespace).delete(pod);
client.pods().inNamespace(namespace).withName(podname).delete();
創建:
Pod podToCreate = new PodBuilder().withNewMetadata().withName(podName).withNamespace(namespace).withLabels(labels).withAnnotations(annotations).endMetadata().withNewSpec().addNewContainer().withName(podName).withImage(imageUrl).withImagePullPolicy("IfNotPresent").withResources(resourceRequirements).withPorts(containerPorts).withEnv(envVarList).withVolumeMounts(volumeMounts).withCommand(commandList).withArgs(argList).endContainer().withNodeSelector(nodeSelector).withRestartPolicy("OnFailure").withVolumes(volumeList)//如果需要容忍污點.addNewToleration().withEffect("NoSchedule").withOperator("Exists").endToleration()//節點選擇策略.withNewAffinity().withNodeAffinity(nodeAffinity).endAffinity().and().build();
Pod pod = null;
try {pod = client.pods().create(podToCreate);
} catch (Exception e) {}
這里需要用到的參數和deployment的差不多,就不贅述了。
七、DaemonSet的創建
和deployment的創建大致一致,只是使用的是client.apps().daemonSets()
以及和上面的示例相比沒有replicas,這里就不再做說明了。
七、給node打標簽
//先查出所需node
NodeList nodeList = client.nodes().list();
//篩選出需要的node
Optional<Node> optionalNode = nodeList.getItems().stream().filter(e -> e.getMetadata().getUid().equals(indexCode)).findFirst();
if (!optionalNode.isPresent()) {throw new XxxException();
}
// 4. 處理node標簽
Node node = optionalNode.get();
//獲取原有標簽
Map<String, String> labels = node.getMetadata().getLabels();
//加入新的標簽
labels.put("xxx","xxx")
//設置標簽
node.getMetadata().setLabels(labels);
//保存
client.nodes().createOrReplace(node);