自动化部署: 基于Ansible/K8s Operator的集群部署方案
2025/9/7大约 17 分钟
在现代分布式系统运维中,自动化部署已成为确保系统稳定性和提高运维效率的关键实践。对于分布式文件存储平台而言,手动部署不仅耗时耗力,还容易出错,难以保证环境的一致性。本章将深入探讨基于Ansible和Kubernetes Operator的自动化部署方案,帮助您构建高效、可靠的部署流程。
8.2.1 Ansible部署方案
Ansible作为一种无代理的自动化工具,以其简洁的YAML语法和强大的模块化设计,在基础设施即代码(Infrastructure as Code)领域广受欢迎。对于分布式文件存储平台,Ansible可以帮助我们实现从环境准备到服务部署的全流程自动化。
8.2.1.1 Ansible基础架构
# ansible.cfg - Ansible配置文件
[defaults]
inventory = inventory/hosts
host_key_checking = False
retry_files_enabled = False
stdout_callback = yaml
bin_ansible_callbacks = True
[ssh_connection]
ssh_args = -o ControlMaster=auto -o ControlPersist=60s
pipelining = True
# inventory/hosts - 主机清单文件
[metadata_servers]
metadata-1 ansible_host=192.168.1.10
metadata-2 ansible_host=192.168.1.11
metadata-3 ansible_host=192.168.1.12
[data_servers]
data-1 ansible_host=192.168.1.20
data-2 ansible_host=192.168.1.21
data-3 ansible_host=192.168.1.22
data-4 ansible_host=192.168.1.23
data-5 ansible_host=192.168.1.24
[all:vars]
dfs_version=2.1.0
dfs_install_path=/opt/dfs
dfs_data_path=/data/dfs
dfs_user=dfs
dfs_group=dfs
8.2.1.2 环境准备Playbook
# playbooks/prepare-environment.yml - 环境准备Playbook
---
- name: 准备分布式文件存储环境
hosts: all
become: yes
vars:
dfs_packages:
- rsync
- python3
- openjdk-11-jdk
- sysstat
- iotop
- htop
tasks:
- name: 检查操作系统版本
command: cat /etc/os-release
register: os_info
changed_when: false
- name: 验证支持的操作系统
assert:
that:
- "'Ubuntu' in os_info.stdout or 'CentOS' in os_info.stdout"
fail_msg: "不支持的操作系统,请使用Ubuntu或CentOS"
- name: 更新包管理器缓存
apt:
update_cache: yes
when: "'Ubuntu' in os_info.stdout"
- name: 更新包管理器缓存
yum:
update_cache: yes
when: "'CentOS' in os_info.stdout"
- name: 安装基础依赖包
apt:
name: "{{ dfs_packages }}"
state: present
when: "'Ubuntu' in os_info.stdout"
- name: 安装基础依赖包
yum:
name: "{{ dfs_packages }}"
state: present
when: "'CentOS' in os_info.stdout"
- name: 创建DFS用户和组
user:
name: "{{ dfs_user }}"
group: "{{ dfs_group }}"
system: yes
home: "{{ dfs_install_path }}"
shell: /bin/bash
create_home: no
- name: 创建安装目录
file:
path: "{{ dfs_install_path }}"
state: directory
owner: "{{ dfs_user }}"
group: "{{ dfs_group }}"
mode: '0755'
- name: 创建数据目录
file:
path: "{{ dfs_data_path }}"
state: directory
owner: "{{ dfs_user }}"
group: "{{ dfs_group }}"
mode: '0755'
- name: 配置SSH无密码登录(仅在控制节点执行)
authorized_key:
user: "{{ dfs_user }}"
key: "{{ lookup('file', '~/.ssh/id_rsa.pub') }}"
delegate_to: localhost
when: inventory_hostname == groups['metadata_servers'][0]
8.2.1.3 核心服务部署Playbook
# playbooks/deploy-core-services.yml - 核心服务部署Playbook
---
- name: 部署分布式文件存储核心服务
hosts: all
become: yes
vars:
dfs_config_template: "templates/dfs.conf.j2"
dfs_service_template: "templates/dfs.service.j2"
tasks:
- name: 下载DFS安装包
get_url:
url: "https://example.com/dfs/releases/dfs-{{ dfs_version }}.tar.gz"
dest: "/tmp/dfs-{{ dfs_version }}.tar.gz"
mode: '0644'
timeout: 300
delegate_to: localhost
run_once: true
- name: 分发安装包到所有节点
copy:
src: "/tmp/dfs-{{ dfs_version }}.tar.gz"
dest: "/tmp/dfs-{{ dfs_version }}.tar.gz"
mode: '0644'
- name: 解压安装包
unarchive:
src: "/tmp/dfs-{{ dfs_version }}.tar.gz"
dest: "{{ dfs_install_path }}"
owner: "{{ dfs_user }}"
group: "{{ dfs_group }}"
remote_src: yes
extra_opts: [--strip-components=1]
- name: 渲染配置文件
template:
src: "{{ dfs_config_template }}"
dest: "{{ dfs_install_path }}/conf/dfs.conf"
owner: "{{ dfs_user }}"
group: "{{ dfs_group }}"
mode: '0644'
notify: 重启DFS服务
- name: 渲染系统服务文件
template:
src: "{{ dfs_service_template }}"
dest: "/etc/systemd/system/dfs.service"
mode: '0644'
notify: 重启DFS服务
- name: 启用并启动DFS服务
systemd:
name: dfs
enabled: yes
state: started
daemon_reload: yes
handlers:
- name: 重启DFS服务
systemd:
name: dfs
state: restarted
8.2.1.4 配置文件模板
# templates/dfs.conf.j2 - DFS配置文件模板
# 分布式文件存储系统配置文件
# 生成时间: {{ ansible_date_time.iso8601 }}
[cluster]
cluster_name = {{ cluster_name | default('dfs-cluster') }}
node_id = {{ inventory_hostname }}
data_path = {{ dfs_data_path }}
[network]
bind_address = {{ ansible_default_ipv4.address }}
bind_port = {{ dfs_port | default(8080) }}
advertise_address = {{ ansible_default_ipv4.address }}
[metadata]
# 元数据服务器配置
{% if inventory_hostname in groups['metadata_servers'] %}
server_role = metadata
metadata_servers = {% for host in groups['metadata_servers'] %}{{ hostvars[host].ansible_default_ipv4.address }}:8080,{% endfor %}
replication_factor = {{ metadata_replication_factor | default(3) }}
{% else %}
server_role = data
metadata_servers = {% for host in groups['metadata_servers'] %}{{ hostvars[host].ansible_default_ipv4.address }}:8080,{% endfor %}
{% endif %}
[storage]
storage_paths = {{ dfs_data_path }}/volumes
max_disk_usage = {{ max_disk_usage | default('85%') }}
[performance]
thread_pool_size = {{ thread_pool_size | default(32) }}
max_connections = {{ max_connections | default(10000) }}
[logging]
log_level = {{ log_level | default('INFO') }}
log_path = {{ dfs_install_path }}/logs
# templates/dfs.service.j2 - DFS系统服务模板
[Unit]
Description=Distributed File Storage Service
After=network.target
[Service]
Type=simple
User={{ dfs_user }}
Group={{ dfs_group }}
WorkingDirectory={{ dfs_install_path }}
ExecStart={{ dfs_install_path }}/bin/dfs-server
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
# 资源限制
LimitNOFILE=65536
LimitNPROC=32768
[Install]
WantedBy=multi-user.target
8.2.1.5 部署脚本
#!/bin/bash
# deploy.sh - 部署脚本
set -e
# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# 日志函数
log_info() {
echo -e "${GREEN}[INFO]${NC} $1"
}
log_warn() {
echo -e "${YELLOW}[WARN]${NC} $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# 检查依赖
check_dependencies() {
log_info "检查依赖..."
if ! command -v ansible &> /dev/null; then
log_error "未找到Ansible,请先安装Ansible"
exit 1
fi
if ! command -v ssh &> /dev/null; then
log_error "未找到SSH客户端"
exit 1
fi
log_info "依赖检查通过"
}
# 验证配置
validate_config() {
log_info "验证配置..."
if [ ! -f "inventory/hosts" ]; then
log_error "未找到主机清单文件 inventory/hosts"
exit 1
fi
if [ ! -f "ansible.cfg" ]; then
log_error "未找到Ansible配置文件 ansible.cfg"
exit 1
fi
log_info "配置验证通过"
}
# 执行部署
run_deployment() {
log_info "开始部署..."
# 准备环境
log_info "准备环境..."
ansible-playbook playbooks/prepare-environment.yml
# 部署核心服务
log_info "部署核心服务..."
ansible-playbook playbooks/deploy-core-services.yml
# 验证部署
log_info "验证部署..."
ansible-playbook playbooks/verify-deployment.yml
log_info "部署完成!"
}
# 主函数
main() {
log_info "分布式文件存储平台自动化部署脚本"
log_info "===================================="
check_dependencies
validate_config
run_deployment
log_info "部署成功完成!"
echo ""
echo "下一步操作:"
echo "1. 检查服务状态: ansible all -m shell -a 'systemctl status dfs'"
echo "2. 查看日志: ansible all -m shell -a 'journalctl -u dfs -f'"
echo "3. 验证集群: ansible metadata_servers[0] -m shell -a '/opt/dfs/bin/dfs-cli cluster status'"
}
# 执行主函数
main "$@"
8.2.1.6 验证Playbook
# playbooks/verify-deployment.yml - 部署验证Playbook
---
- name: 验证分布式文件存储部署
hosts: all
become: yes
tasks:
- name: 检查DFS服务状态
systemd:
name: dfs
state: started
register: service_status
- name: 验证服务运行状态
assert:
that:
- service_status.status.ActiveState == "active"
- service_status.status.SubState == "running"
fail_msg: "DFS服务未正常运行"
success_msg: "DFS服务运行正常"
- name: 检查监听端口
wait_for:
host: "{{ ansible_default_ipv4.address }}"
port: "{{ dfs_port | default(8080) }}"
state: started
timeout: 30
delegate_to: localhost
- name: 验证元数据服务器集群状态
shell: "{{ dfs_install_path }}/bin/dfs-cli cluster status"
args:
chdir: "{{ dfs_install_path }}"
register: cluster_status
when: inventory_hostname == groups['metadata_servers'][0]
- name: 显示集群状态
debug:
var: cluster_status.stdout
when: inventory_hostname == groups['metadata_servers'][0]
- name: 验证数据节点注册
shell: "{{ dfs_install_path }}/bin/dfs-cli node list"
args:
chdir: "{{ dfs_install_path }}"
register: node_list
when: inventory_hostname == groups['metadata_servers'][0]
- name: 检查数据节点数量
assert:
that:
- "node_list.stdout_lines | length >= {{ groups['data_servers'] | length }}"
fail_msg: "数据节点注册不完整"
success_msg: "所有数据节点已成功注册"
when: inventory_hostname == groups['metadata_servers'][0]
8.2.2 Kubernetes Operator部署方案
随着云原生技术的普及,Kubernetes已成为容器化应用部署的事实标准。对于分布式文件存储平台,使用Kubernetes Operator可以实现更高级别的自动化管理,包括自愈、滚动升级、自动扩缩容等功能。
8.2.2.1 Custom Resource Definition (CRD)
# config/crd/bases/dfs.example.com_dfsclusters.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: dfsclusters.dfs.example.com
spec:
group: dfs.example.com
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
version:
type: string
description: DFS版本
replicas:
type: object
properties:
metadata:
type: integer
minimum: 1
data:
type: integer
minimum: 1
required: ["metadata", "data"]
storage:
type: object
properties:
size:
type: string
pattern: '^[0-9]+[KMGT]i$'
storageClass:
type: string
required: ["size"]
resources:
type: object
properties:
metadata:
type: object
properties:
requests:
type: object
properties:
cpu:
type: string
memory:
type: string
limits:
type: object
properties:
cpu:
type: string
memory:
type: string
data:
type: object
properties:
requests:
type: object
properties:
cpu:
type: string
memory:
type: string
limits:
type: object
properties:
cpu:
type: string
memory:
type: string
config:
type: object
properties:
logLevel:
type: string
enum: ["DEBUG", "INFO", "WARN", "ERROR"]
maxConnections:
type: integer
minimum: 1
replicationFactor:
type: integer
minimum: 1
maximum: 5
required: ["version", "replicas", "storage"]
status:
type: object
properties:
phase:
type: string
enum: ["Pending", "Running", "Failed", "Terminating"]
nodes:
type: array
items:
type: object
properties:
name:
type: string
role:
type: string
enum: ["metadata", "data"]
status:
type: string
enum: ["Ready", "NotReady", "Unknown"]
address:
type: string
scope: Namespaced
names:
plural: dfsclusters
singular: dfscluster
kind: DFSCluster
listKind: DFSClusterList
8.2.2.2 Operator控制器实现
// controllers/dfscluster_controller.go
package controllers
import (
"context"
"fmt"
"time"
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
dfsv1 "dfs-operator/api/v1"
)
// DFSClusterReconciler reconciles a DFSCluster object
type DFSClusterReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
}
//+kubebuilder:rbac:groups=dfs.example.com,resources=dfsclusters,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=dfs.example.com,resources=dfsclusters/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=dfs.example.com,resources=dfsclusters/finalizers,verbs=update
//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete
func (r *DFSClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.Log.WithValues("dfscluster", req.NamespacedName)
// 获取DFSCluster实例
dfsCluster := &dfsv1.DFSCluster{}
if err := r.Get(ctx, req.NamespacedName, dfsCluster); err != nil {
if errors.IsNotFound(err) {
// 资源已删除,忽略
return ctrl.Result{}, nil
}
log.Error(err, "无法获取DFSCluster")
return ctrl.Result{}, err
}
// 处理删除操作
if dfsCluster.DeletionTimestamp != nil {
return r.handleDeletion(ctx, dfsCluster)
}
// 添加Finalizer
if !controllerutil.ContainsFinalizer(dfsCluster, "dfs.example.com/finalizer") {
controllerutil.AddFinalizer(dfsCluster, "dfs.example.com/finalizer")
if err := r.Update(ctx, dfsCluster); err != nil {
return ctrl.Result{}, err
}
}
// 部署元数据服务
if err := r.deployMetadataService(ctx, dfsCluster); err != nil {
log.Error(err, "部署元数据服务失败")
return ctrl.Result{}, err
}
// 部署数据服务
if err := r.deployDataService(ctx, dfsCluster); err != nil {
log.Error(err, "部署数据服务失败")
return ctrl.Result{}, err
}
// 更新状态
if err := r.updateStatus(ctx, dfsCluster); err != nil {
log.Error(err, "更新状态失败")
return ctrl.Result{RequeueAfter: time.Second * 10}, err
}
return ctrl.Result{}, nil
}
func (r *DFSClusterReconciler) deployMetadataService(ctx context.Context, dfsCluster *dfsv1.DFSCluster) error {
log := r.Log.WithValues("dfscluster", fmt.Sprintf("%s/%s", dfsCluster.Namespace, dfsCluster.Name))
// 创建Headless Service用于StatefulSet
metadataSvc := r.metadataService(dfsCluster)
if err := controllerutil.SetControllerReference(dfsCluster, metadataSvc, r.Scheme); err != nil {
return err
}
foundSvc := &corev1.Service{}
err := r.Get(ctx, types.NamespacedName{Name: metadataSvc.Name, Namespace: metadataSvc.Namespace}, foundSvc)
if err != nil && errors.IsNotFound(err) {
log.Info("创建元数据服务", "Service.Name", metadataSvc.Name)
if err = r.Create(ctx, metadataSvc); err != nil {
return err
}
} else if err != nil {
return err
}
// 创建StatefulSet
metadataSts := r.metadataStatefulSet(dfsCluster)
if err := controllerutil.SetControllerReference(dfsCluster, metadataSts, r.Scheme); err != nil {
return err
}
foundSts := &appsv1.StatefulSet{}
err = r.Get(ctx, types.NamespacedName{Name: metadataSts.Name, Namespace: metadataSts.Namespace}, foundSts)
if err != nil && errors.IsNotFound(err) {
log.Info("创建元数据StatefulSet", "StatefulSet.Name", metadataSts.Name)
if err = r.Create(ctx, metadataSts); err != nil {
return err
}
} else if err != nil {
return err
}
return nil
}
func (r *DFSClusterReconciler) deployDataService(ctx context.Context, dfsCluster *dfsv1.DFSCluster) error {
log := r.Log.WithValues("dfscluster", fmt.Sprintf("%s/%s", dfsCluster.Namespace, dfsCluster.Name))
// 创建Headless Service
dataSvc := r.dataService(dfsCluster)
if err := controllerutil.SetControllerReference(dfsCluster, dataSvc, r.Scheme); err != nil {
return err
}
foundSvc := &corev1.Service{}
err := r.Get(ctx, types.NamespacedName{Name: dataSvc.Name, Namespace: dataSvc.Namespace}, foundSvc)
if err != nil && errors.IsNotFound(err) {
log.Info("创建数据服务", "Service.Name", dataSvc.Name)
if err = r.Create(ctx, dataSvc); err != nil {
return err
}
} else if err != nil {
return err
}
// 创建StatefulSet
dataSts := r.dataStatefulSet(dfsCluster)
if err := controllerutil.SetControllerReference(dfsCluster, dataSts, r.Scheme); err != nil {
return err
}
foundSts := &appsv1.StatefulSet{}
err = r.Get(ctx, types.NamespacedName{Name: dataSts.Name, Namespace: dataSts.Namespace}, foundSts)
if err != nil && errors.IsNotFound(err) {
log.Info("创建数据StatefulSet", "StatefulSet.Name", dataSts.Name)
if err = r.Create(ctx, dataSts); err != nil {
return err
}
} else if err != nil {
return err
}
return nil
}
func (r *DFSClusterReconciler) metadataService(dfsCluster *dfsv1.DFSCluster) *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-metadata-svc", dfsCluster.Name),
Namespace: dfsCluster.Namespace,
},
Spec: corev1.ServiceSpec{
ClusterIP: "None",
Selector: map[string]string{
"app": fmt.Sprintf("%s-metadata", dfsCluster.Name),
},
Ports: []corev1.ServicePort{
{
Name: "rpc",
Port: 8080,
TargetPort: intstr.FromInt(8080),
Protocol: corev1.ProtocolTCP,
},
{
Name: "metrics",
Port: 8081,
TargetPort: intstr.FromInt(8081),
Protocol: corev1.ProtocolTCP,
},
},
},
}
}
func (r *DFSClusterReconciler) dataService(dfsCluster *dfsv1.DFSCluster) *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-data-svc", dfsCluster.Name),
Namespace: dfsCluster.Namespace,
},
Spec: corev1.ServiceSpec{
ClusterIP: "None",
Selector: map[string]string{
"app": fmt.Sprintf("%s-data", dfsCluster.Name),
},
Ports: []corev1.ServicePort{
{
Name: "rpc",
Port: 8080,
TargetPort: intstr.FromInt(8080),
Protocol: corev1.ProtocolTCP,
},
{
Name: "metrics",
Port: 8081,
TargetPort: intstr.FromInt(8081),
Protocol: corev1.ProtocolTCP,
},
},
},
}
}
func (r *DFSClusterReconciler) metadataStatefulSet(dfsCluster *dfsv1.DFSCluster) *appsv1.StatefulSet {
replicas := int32(dfsCluster.Spec.Replicas.Metadata)
return &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-metadata", dfsCluster.Name),
Namespace: dfsCluster.Namespace,
},
Spec: appsv1.StatefulSetSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": fmt.Sprintf("%s-metadata", dfsCluster.Name),
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": fmt.Sprintf("%s-metadata", dfsCluster.Name),
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "metadata",
Image: fmt.Sprintf("dfs/metadata:%s", dfsCluster.Spec.Version),
Ports: []corev1.ContainerPort{
{
ContainerPort: 8080,
Name: "rpc",
},
{
ContainerPort: 8081,
Name: "metrics",
},
},
Env: []corev1.EnvVar{
{
Name: "NODE_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
{
Name: "CLUSTER_NAME",
Value: dfsCluster.Name,
},
{
Name: "LOG_LEVEL",
Value: dfsCluster.Spec.Config.LogLevel,
},
{
Name: "MAX_CONNECTIONS",
Value: fmt.Sprintf("%d", dfsCluster.Spec.Config.MaxConnections),
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "metadata-data",
MountPath: "/data",
},
},
Resources: r.getMetadataResources(dfsCluster),
},
},
Volumes: []corev1.Volume{
{
Name: "metadata-data",
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: fmt.Sprintf("%s-metadata-pvc", dfsCluster.Name),
},
},
},
},
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-metadata-pvc", dfsCluster.Name),
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.ReadWriteOnce,
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse(dfsCluster.Spec.Storage.Size),
},
},
StorageClassName: &dfsCluster.Spec.Storage.StorageClass,
},
},
},
},
}
}
func (r *DFSClusterReconciler) dataStatefulSet(dfsCluster *dfsv1.DFSCluster) *appsv1.StatefulSet {
replicas := int32(dfsCluster.Spec.Replicas.Data)
return &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-data", dfsCluster.Name),
Namespace: dfsCluster.Namespace,
},
Spec: appsv1.StatefulSetSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": fmt.Sprintf("%s-data", dfsCluster.Name),
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": fmt.Sprintf("%s-data", dfsCluster.Name),
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "data",
Image: fmt.Sprintf("dfs/data:%s", dfsCluster.Spec.Version),
Ports: []corev1.ContainerPort{
{
ContainerPort: 8080,
Name: "rpc",
},
{
ContainerPort: 8081,
Name: "metrics",
},
},
Env: []corev1.EnvVar{
{
Name: "NODE_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
{
Name: "CLUSTER_NAME",
Value: dfsCluster.Name,
},
{
Name: "LOG_LEVEL",
Value: dfsCluster.Spec.Config.LogLevel,
},
{
Name: "REPLICATION_FACTOR",
Value: fmt.Sprintf("%d", dfsCluster.Spec.Config.ReplicationFactor),
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "data-storage",
MountPath: "/data",
},
},
Resources: r.getDataResources(dfsCluster),
},
},
Volumes: []corev1.Volume{
{
Name: "data-storage",
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: fmt.Sprintf("%s-data-pvc", dfsCluster.Name),
},
},
},
},
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-data-pvc", dfsCluster.Name),
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.ReadWriteOnce,
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse(dfsCluster.Spec.Storage.Size),
},
},
StorageClassName: &dfsCluster.Spec.Storage.StorageClass,
},
},
},
},
}
}
func (r *DFSClusterReconciler) getMetadataResources(dfsCluster *dfsv1.DFSCluster) corev1.ResourceRequirements {
if dfsCluster.Spec.Resources.Metadata.Requests != nil || dfsCluster.Spec.Resources.Metadata.Limits != nil {
return corev1.ResourceRequirements{
Requests: dfsCluster.Spec.Resources.Metadata.Requests,
Limits: dfsCluster.Spec.Resources.Metadata.Limits,
}
}
// 默认资源请求
return corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("4Gi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("4"),
corev1.ResourceMemory: resource.MustParse("8Gi"),
},
}
}
func (r *DFSClusterReconciler) getDataResources(dfsCluster *dfsv1.DFSCluster) corev1.ResourceRequirements {
if dfsCluster.Spec.Resources.Data.Requests != nil || dfsCluster.Spec.Resources.Data.Limits != nil {
return corev1.ResourceRequirements{
Requests: dfsCluster.Spec.Resources.Data.Requests,
Limits: dfsCluster.Spec.Resources.Data.Limits,
}
}
// 默认资源请求
return corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("4"),
corev1.ResourceMemory: resource.MustParse("8Gi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("8"),
corev1.ResourceMemory: resource.MustParse("16Gi"),
},
}
}
func (r *DFSClusterReconciler) updateStatus(ctx context.Context, dfsCluster *dfsv1.DFSCluster) error {
// 这里应该实现状态更新逻辑
// 例如检查StatefulSet的状态,更新节点列表等
// 简化实现,仅设置运行状态
dfsCluster.Status.Phase = "Running"
return r.Status().Update(ctx, dfsCluster)
}
func (r *DFSClusterReconciler) handleDeletion(ctx context.Context, dfsCluster *dfsv1.DFSCluster) (ctrl.Result, error) {
// 处理资源清理
if controllerutil.ContainsFinalizer(dfsCluster, "dfs.example.com/finalizer") {
// 执行清理操作
// 例如删除外部资源、备份数据等
// 移除Finalizer
controllerutil.RemoveFinalizer(dfsCluster, "dfs.example.com/finalizer")
if err := r.Update(ctx, dfsCluster); err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *DFSClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&dfsv1.DFSCluster{}).
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.Service{}).
Complete(r)
}
8.2.2.3 自定义资源示例
# config/samples/dfs_v1_dfscluster.yaml
apiVersion: dfs.example.com/v1
kind: DFSCluster
metadata:
name: production-cluster
namespace: dfs-system
spec:
version: "2.1.0"
replicas:
metadata: 3
data: 5
storage:
size: "1000Gi"
storageClass: "fast-ssd"
resources:
metadata:
requests:
cpu: "2"
memory: "4Gi"
limits:
cpu: "4"
memory: "8Gi"
data:
requests:
cpu: "4"
memory: "8Gi"
limits:
cpu: "8"
memory: "16Gi"
config:
logLevel: "INFO"
maxConnections: 10000
replicationFactor: 3
8.2.2.4 Operator部署脚本
#!/bin/bash
# deploy-operator.sh - Operator部署脚本
set -e
# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# 日志函数
log_info() {
echo -e "${GREEN}[INFO]${NC} $1"
}
log_warn() {
echo -e "${YELLOW}[WARN]${NC} $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# 检查Kubernetes集群连接
check_k8s_connection() {
log_info "检查Kubernetes集群连接..."
if ! command -v kubectl &> /dev/null; then
log_error "未找到kubectl命令"
exit 1
fi
if ! kubectl cluster-info &> /dev/null; then
log_error "无法连接到Kubernetes集群"
exit 1
fi
log_info "Kubernetes集群连接正常"
}
# 安装CRD
install_crds() {
log_info "安装Custom Resource Definitions..."
kubectl apply -f config/crd/bases/
log_info "CRD安装完成"
}
# 部署Operator
deploy_operator() {
log_info "部署DFS Operator..."
# 创建命名空间
kubectl create namespace dfs-system --dry-run=client -o yaml | kubectl apply -f -
# 部署Operator
kubectl apply -f config/manager/manager.yaml -n dfs-system
# 等待Operator就绪
log_info "等待Operator就绪..."
kubectl wait --for=condition=available deployment/dfs-operator-controller-manager -n dfs-system --timeout=300s
log_info "Operator部署完成"
}
# 部署RBAC
deploy_rbac() {
log_info "部署RBAC规则..."
kubectl apply -f config/rbac/ -n dfs-system
log_info "RBAC部署完成"
}
# 部署Webhook(如果需要)
deploy_webhooks() {
log_info "部署Webhook配置..."
# 检查是否需要webhook
if [ -d "config/webhook" ]; then
kubectl apply -f config/webhook/ -n dfs-system
log_info "Webhook部署完成"
else
log_info "未找到Webhook配置,跳过"
fi
}
# 验证部署
verify_deployment() {
log_info "验证部署..."
# 检查CRD
if ! kubectl get crd dfsclusters.dfs.example.com &> /dev/null; then
log_error "CRD未正确安装"
exit 1
fi
# 检查Operator
if ! kubectl get deployment dfs-operator-controller-manager -n dfs-system &> /dev/null; then
log_error "Operator未正确部署"
exit 1
fi
# 检查Pod状态
kubectl get pods -n dfs-system
log_info "部署验证通过"
}
# 主函数
main() {
log_info "DFS Operator部署脚本"
log_info "===================="
check_k8s_connection
install_crds
deploy_rbac
deploy_operator
deploy_webhooks
verify_deployment
log_info "DFS Operator部署成功!"
echo ""
echo "下一步操作:"
echo "1. 部署DFS集群: kubectl apply -f config/samples/dfs_v1_dfscluster.yaml"
echo "2. 查看集群状态: kubectl get dfscluster -n dfs-system"
echo "3. 查看Pod状态: kubectl get pods -n dfs-system"
}
# 执行主函数
main "$@"
8.2.2.5 集群管理脚本
#!/bin/bash
# dfs-cluster-manager.sh - DFS集群管理脚本
set -e
# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# 全局变量
NAMESPACE="dfs-system"
CLUSTER_NAME=""
# 日志函数
log_info() {
echo -e "${GREEN}[INFO]${NC} $1"
}
log_warn() {
echo -e "${YELLOW}[WARN]${NC} $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
log_debug() {
echo -e "${BLUE}[DEBUG]${NC} $1"
}
# 显示帮助信息
show_help() {
echo "DFS集群管理工具"
echo "用法: $0 [选项] 命令"
echo ""
echo "命令:"
echo " create <name> 创建DFS集群"
echo " delete <name> 删除DFS集群"
echo " status <name> 查看DFS集群状态"
echo " scale <name> 扩展DFS集群"
echo " upgrade <name> 升级DFS集群"
echo " list 列出所有DFS集群"
echo ""
echo "选项:"
echo " -n, --namespace 指定命名空间 (默认: dfs-system)"
echo " -h, --help 显示帮助信息"
}
# 解析命令行参数
parse_args() {
while [[ $# -gt 0 ]]; do
case $1 in
-n|--namespace)
NAMESPACE="$2"
shift 2
;;
-h|--help)
show_help
exit 0
;;
create|delete|status|scale|upgrade|list)
COMMAND="$1"
if [[ "$1" != "list" ]]; then
CLUSTER_NAME="$2"
shift 2
else
shift
fi
;;
*)
log_error "未知参数: $1"
show_help
exit 1
;;
esac
done
}
# 创建集群
create_cluster() {
local name=$1
if [[ -z "$name" ]]; then
log_error "请指定集群名称"
exit 1
fi
log_info "创建DFS集群: $name"
# 生成集群配置
cat <<EOF | kubectl apply -f -
apiVersion: dfs.example.com/v1
kind: DFSCluster
metadata:
name: $name
namespace: $NAMESPACE
spec:
version: "2.1.0"
replicas:
metadata: 3
data: 3
storage:
size: "100Gi"
storageClass: "default"
config:
logLevel: "INFO"
maxConnections: 1000
replicationFactor: 3
EOF
log_info "集群创建请求已提交"
log_info "等待集群就绪..."
# 等待集群就绪
for i in {1..30}; do
sleep 10
local status=$(kubectl get dfscluster $name -n $NAMESPACE -o jsonpath='{.status.phase}' 2>/dev/null || echo "Unknown")
if [[ "$status" == "Running" ]]; then
log_info "集群已就绪"
return 0
fi
log_debug "集群状态: $status"
done
log_warn "集群可能仍在初始化中,请稍后检查状态"
}
# 删除集群
delete_cluster() {
local name=$1
if [[ -z "$name" ]]; then
log_error "请指定集群名称"
exit 1
fi
log_info "删除DFS集群: $name"
kubectl delete dfscluster $name -n $NAMESPACE
log_info "集群删除请求已提交"
}
# 查看集群状态
show_status() {
local name=$1
if [[ -z "$name" ]]; then
log_error "请指定集群名称"
exit 1
fi
log_info "DFS集群状态: $name"
echo ""
# 显示集群基本信息
kubectl get dfscluster $name -n $NAMESPACE -o wide
echo ""
# 显示Pod状态
log_info "Pod状态:"
kubectl get pods -n $NAMESPACE -l app=$name-metadata
kubectl get pods -n $NAMESPACE -l app=$name-data
echo ""
# 显示服务状态
log_info "服务状态:"
kubectl get services -n $NAMESPACE -l app=$name-metadata
kubectl get services -n $NAMESPACE -l app=$name-data
}
# 扩展集群
scale_cluster() {
local name=$1
if [[ -z "$name" ]]; then
log_error "请指定集群名称"
exit 1
fi
log_info "扩展DFS集群: $name"
# 获取当前配置
local current_metadata=$(kubectl get dfscluster $name -n $NAMESPACE -o jsonpath='{.spec.replicas.metadata}')
local current_data=$(kubectl get dfscluster $name -n $NAMESPACE -o jsonpath='{.spec.replicas.data}')
log_info "当前配置: 元数据节点 $current_metadata 个, 数据节点 $current_data 个"
# 交互式获取新配置
read -p "新的元数据节点数量 [$current_metadata]: " new_metadata
new_metadata=${new_metadata:-$current_metadata}
read -p "新的数据节点数量 [$current_data]: " new_data
new_data=${new_data:-$current_data}
# 更新配置
kubectl patch dfscluster $name -n $NAMESPACE --type='merge' -p \
"{\"spec\":{\"replicas\":{\"metadata\":$new_metadata,\"data\":$new_data}}}"
log_info "集群扩展请求已提交"
log_info "等待集群更新完成..."
}
# 升级集群
upgrade_cluster() {
local name=$1
if [[ -z "$name" ]]; then
log_error "请指定集群名称"
exit 1
fi
log_info "升级DFS集群: $name"
# 获取当前版本
local current_version=$(kubectl get dfscluster $name -n $NAMESPACE -o jsonpath='{.spec.version}')
log_info "当前版本: $current_version"
# 获取可用版本列表(模拟)
local available_versions=("2.1.0" "2.1.1" "2.2.0" "2.3.0")
log_info "可用版本: ${available_versions[*]}"
# 交互式选择新版本
read -p "请选择新版本: " new_version
# 验证版本
if [[ ! " ${available_versions[*]} " =~ " ${new_version} " ]]; then
log_error "无效的版本: $new_version"
exit 1
fi
# 执行升级
kubectl patch dfscluster $name -n $NAMESPACE --type='merge' -p \
"{\"spec\":{\"version\":\"$new_version\"}}"
log_info "集群升级请求已提交"
log_info "升级过程将在后台进行,请监控集群状态"
}
# 列出所有集群
list_clusters() {
log_info "DFS集群列表:"
echo ""
kubectl get dfscluster -A
}
# 主函数
main() {
parse_args "$@"
if [[ -z "$COMMAND" ]]; then
log_error "请指定命令"
show_help
exit 1
fi
case $COMMAND in
create)
create_cluster "$CLUSTER_NAME"
;;
delete)
delete_cluster "$CLUSTER_NAME"
;;
status)
show_status "$CLUSTER_NAME"
;;
scale)
scale_cluster "$CLUSTER_NAME"
;;
upgrade)
upgrade_cluster "$CLUSTER_NAME"
;;
list)
list_clusters
;;
*)
log_error "未知命令: $COMMAND"
show_help
exit 1
;;
esac
}
# 执行主函数
main "$@"
8.2.3 部署监控与告警
8.2.3.1 Prometheus监控集成
# config/monitoring/prometheus-rules.yaml
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: dfs-cluster-rules
namespace: dfs-system
spec:
groups:
- name: dfs.cluster.rules
rules:
- alert: DFSClusterDown
expr: absent(up{job="dfs-cluster"} == 1)
for: 5m
labels:
severity: critical
annotations:
summary: "DFS集群宕机"
description: "DFS集群中的所有节点都已宕机超过5分钟"
- alert: DFSClusterDegraded
expr: count(up{job="dfs-cluster"} == 0) > 0
for: 2m
labels:
severity: warning
annotations:
summary: "DFS集群降级"
description: "DFS集群中有节点宕机"
- alert: DFSClusterHighDiskUsage
expr: dfs_disk_usage_percent > 85
for: 5m
labels:
severity: warning
annotations:
summary: "DFS磁盘使用率过高"
description: "DFS节点磁盘使用率超过85%"
- alert: DFSClusterHighMemoryUsage
expr: dfs_memory_usage_percent > 90
for: 5m
labels:
severity: warning
annotations:
summary: "DFS内存使用率过高"
description: "DFS节点内存使用率超过90%"
- alert: DFSClusterHighCPUUsage
expr: dfs_cpu_usage_percent > 90
for: 5m
labels:
severity: warning
annotations:
summary: "DFS CPU使用率过高"
description: "DFS节点CPU使用率超过90%"
8.2.3.2 Grafana仪表板
{
"dashboard": {
"id": null,
"title": "DFS Cluster Overview",
"timezone": "browser",
"schemaVersion": 16,
"version": 0,
"refresh": "30s",
"panels": [
{
"id": 1,
"type": "graph",
"title": "集群状态",
"gridPos": {
"x": 0,
"y": 0,
"w": 12,
"h": 6
},
"targets": [
{
"expr": "up{job=\"dfs-cluster\"}",
"legendFormat": "{{instance}}",
"refId": "A"
}
],
"yaxes": [
{
"format": "short",
"label": "状态",
"min": 0,
"max": 1
},
{
"format": "short"
}
]
},
{
"id": 2,
"type": "stat",
"title": "在线节点数",
"gridPos": {
"x": 12,
"y": 0,
"w": 6,
"h": 3
},
"targets": [
{
"expr": "count(up{job=\"dfs-cluster\"} == 1)",
"refId": "A"
}
],
"options": {
"colorMode": "value",
"graphMode": "none",
"justifyMode": "auto",
"orientation": "horizontal",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"fields": "",
"values": false
}
}
},
{
"id": 3,
"type": "stat",
"title": "总存储容量",
"gridPos": {
"x": 18,
"y": 0,
"w": 6,
"h": 3
},
"targets": [
{
"expr": "sum(dfs_total_storage_bytes)",
"refId": "A"
}
],
"options": {
"colorMode": "value",
"graphMode": "none",
"justifyMode": "auto",
"orientation": "horizontal",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"fields": "",
"values": false
},
"textMode": "auto"
},
"fieldConfig": {
"defaults": {
"unit": "bytes"
}
}
},
{
"id": 4,
"type": "graph",
"title": "磁盘使用率",
"gridPos": {
"x": 0,
"y": 6,
"w": 12,
"h": 6
},
"targets": [
{
"expr": "dfs_disk_usage_percent",
"legendFormat": "{{instance}}",
"refId": "A"
}
],
"yaxes": [
{
"format": "percent",
"label": "使用率",
"min": 0,
"max": 100
},
{
"format": "short"
}
]
},
{
"id": 5,
"type": "graph",
"title": "网络I/O",
"gridPos": {
"x": 12,
"y": 6,
"w": 12,
"h": 6
},
"targets": [
{
"expr": "rate(dfs_network_bytes_received[5m])",
"legendFormat": "{{instance}} 接收",
"refId": "A"
},
{
"expr": "rate(dfs_network_bytes_transmitted[5m])",
"legendFormat": "{{instance}} 发送",
"refId": "B"
}
],
"yaxes": [
{
"format": "Bps",
"label": "流量"
},
{
"format": "short"
}
]
}
]
}
}
总结
自动化部署是现代分布式系统运维的核心实践。通过Ansible和Kubernetes Operator两种方案,我们可以实现从传统基础设施到云原生环境的全面自动化部署。
Ansible方案适合传统的虚拟机或物理机部署,具有配置简单、易于理解和维护的优点。而Kubernetes Operator方案则更适合云原生环境,提供了更高级别的自动化管理能力,包括自愈、滚动升级、自动扩缩容等功能。
在实际应用中,我们应该根据具体的部署环境和业务需求选择合适的方案,或者结合两种方案的优势,构建混合部署架构。同时,完善的监控和告警机制也是确保系统稳定运行的重要保障。
通过系统性的自动化部署实践,我们可以显著提高部署效率,降低运维成本,确保环境的一致性和可重复性,为分布式文件存储平台的稳定运行奠定坚实基础。