make node/pod labels configurable
Some checks failed
Gitea/syncthingcsi/pipeline/head There was a failure building this commit

This commit is contained in:
dave 2024-08-07 15:18:49 -07:00
parent cf7741c72a
commit e36513d59a
13 changed files with 124 additions and 54 deletions

View File

@ -63,7 +63,6 @@ Development
TODO list
---------
- get the image on docker hub
- review TODOs in code and below
- high-level command interface
- ShareFolderToNode() etc

View File

@ -145,6 +145,16 @@ resource "kubernetes_deployment_v1" "controller" {
limits = var.csi_resources.limits
}
env {
name = "STCSI_DRIVER_NAME"
value = var.driver_name
}
env {
name = "STCSI_NODE_LABEL"
value = var.node_label
}
env {
name = "CSI_ENDPOINT"
value = "unix:/shared/csi.sock"

View File

@ -32,6 +32,18 @@ variable "csi_image" {
default = "dockermirror:5000/dpedu/syncthing-csi"
}
variable "driver_name" {
type = string
description = "csi driver name. You really don't want to change this after initial installation."
default = "syncthing.csi.davepedu.com"
}
variable "node_label" {
type = string
description = "restrict operation to nodes matching this label"
default = ""
}
resource "kubernetes_csi_driver_v1" "syncthing" {
metadata {

View File

@ -272,6 +272,11 @@ resource "kubernetes_daemon_set_v1" "syncthing" {
limits = var.introducer_resources.limits
}
env {
name = "STCSI_NODE_LABEL"
value = var.node_label
}
env {
name = "KUBERNETES_NAMESPACE"
value_from {
@ -332,6 +337,16 @@ resource "kubernetes_daemon_set_v1" "syncthing" {
limits = var.csi_resources.limits
}
env {
name = "STCSI_DRIVER_NAME"
value = var.driver_name
}
env {
name = "STCSI_NODE_LABEL"
value = var.node_label
}
env {
name = "CSI_ENDPOINT"
value = "unix:/csi/csi.sock"
@ -438,7 +453,6 @@ resource "kubernetes_daemon_set_v1" "syncthing" {
}
}
volume {
name = "plugin-dir"
host_path {

View File

@ -6,6 +6,7 @@ import (
"git.davepedu.com/dave/syncthingcsi/pkg/driver/nodeapi"
"git.davepedu.com/dave/syncthingcsi/pkg/driverutil"
"git.davepedu.com/dave/syncthingcsi/pkg/stapi"
"git.davepedu.com/dave/syncthingcsi/pkg/stconst"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@ -71,7 +72,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
_, err := api.PutConfigFolder(stapi.Folder{
ID: req.Name,
Label: req.Name,
Path: filepath.Join(SyncthingVolumesRoot, req.Name),
Path: filepath.Join(stconst.SyncthingVolumesRoot, req.Name),
Paused: false,
Devices: deviceIds,
})
@ -204,7 +205,7 @@ func (d *Driver) ControllerPublishVolume(ctx context.Context, req *csi.Controlle
_, err := api.PutConfigFolder(stapi.Folder{
ID: req.VolumeId,
Label: req.VolumeId,
Path: filepath.Join(SyncthingVolumesRoot, req.VolumeId),
Path: filepath.Join(stconst.SyncthingVolumesRoot, req.VolumeId),
Paused: false,
Devices: devices,
})

View File

@ -5,6 +5,7 @@ import (
"git.davepedu.com/dave/callticker"
"git.davepedu.com/dave/stringset"
"git.davepedu.com/dave/syncthingcsi/pkg/driverutil"
"git.davepedu.com/dave/syncthingcsi/pkg/stconst"
"k8s.io/klog/v2"
"math/rand"
"os"
@ -56,13 +57,13 @@ func (d *Driver) RequestDeleterRun() {
func (d *Driver) runDeleterOnce() error {
// safety check to ensure our destination is something sane
if HostDeletedVolumeDataDir == "" || HostDeletedVolumeDataDir == "/" {
return fmt.Errorf("invalid HostDeletedVolumeDataDir: '%s'", HostDeletedVolumeDataDir)
if stconst.HostDeletedVolumeDataDir == "" || stconst.HostDeletedVolumeDataDir == "/" {
return fmt.Errorf("invalid HostDeletedVolumeDataDir: '%s'", stconst.HostDeletedVolumeDataDir)
}
// create /var/syncthing-csi/data/deleted-volumes
if !pathExists(HostDeletedVolumeDataDir) {
err := os.MkdirAll(HostDeletedVolumeDataDir, 0644)
if !pathExists(stconst.HostDeletedVolumeDataDir) {
err := os.MkdirAll(stconst.HostDeletedVolumeDataDir, 0644)
if err != nil {
return fmt.Errorf("couldn't make deleted volumes dir: %s", err)
}
@ -72,7 +73,7 @@ func (d *Driver) runDeleterOnce() error {
// list the file system for a set of folders
fsVolumes := stringset.NewStringSet()
fsDirs, err := os.ReadDir(HostVolumeDataDir)
fsDirs, err := os.ReadDir(stconst.HostVolumeDataDir)
if err != nil {
return fmt.Errorf("couldn't list local filesystem folders: %s", err)
}
@ -102,8 +103,8 @@ func (d *Driver) runDeleterOnce() error {
// move the indicated volumes under the above dir
for _, del := range deletable.All() {
volumePath := filepath.Join(HostVolumeDataDir, del)
deletedVolumePath := filepath.Join(HostDeletedVolumeDataDir, del+"."+randomString(6))
volumePath := filepath.Join(stconst.HostVolumeDataDir, del)
deletedVolumePath := filepath.Join(stconst.HostDeletedVolumeDataDir, del+"."+randomString(6))
klog.V(4).Infof("deleter: moving %s to %s", volumePath, deletedVolumePath)
err := os.Rename(volumePath, deletedVolumePath)
if err != nil {
@ -115,7 +116,7 @@ func (d *Driver) runDeleterOnce() error {
unlock()
// delete stuff in the deleted-volumes dir
err = removeContents(HostDeletedVolumeDataDir)
err = removeContents(stconst.HostDeletedVolumeDataDir)
if err != nil {
return fmt.Errorf("unable to purge deleted volumes: %s", err)
}

View File

@ -7,6 +7,7 @@ import (
"git.davepedu.com/dave/syncthingcsi/pkg/driver/nodeapi"
"git.davepedu.com/dave/syncthingcsi/pkg/driverutil"
"git.davepedu.com/dave/syncthingcsi/pkg/stapi"
"git.davepedu.com/dave/syncthingcsi/pkg/stconst"
"github.com/container-storage-interface/spec/lib/go/csi"
efsutil "github.com/kubernetes-sigs/aws-efs-csi-driver/pkg/util"
"google.golang.org/grpc"
@ -17,10 +18,6 @@ import (
"time"
)
const driverName = "syncthing.csi.davepedu.com" // TODO optionally from env var
const driverVersion = "0.0.1"
const DeviceAnnotationKey = driverName + "/syncthing-device-id"
type Driver struct {
endpoint string // endpoint address we bind the grpc server too
nodeID string // the kubernetes node name we are running on, used for introspection
@ -149,12 +146,12 @@ func (d *Driver) clearSyncCache(folderName stapi.FolderID) {
}
func (d *Driver) setAnnotation(deviceID string) error {
if KubernetesPodName == "" {
if stconst.KubernetesPodName == "" {
panic("must set KUBERNETES_POD_NAME")
}
// get the pod
pod, err := d.client.CoreV1().Pods(d.namespace).Get(context.TODO(), KubernetesPodName, metav1.GetOptions{})
pod, err := d.client.CoreV1().Pods(d.namespace).Get(context.TODO(), stconst.KubernetesPodName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get pod: %s", err)
}
@ -164,7 +161,7 @@ func (d *Driver) setAnnotation(deviceID string) error {
pod.Annotations = make(map[string]string)
}
pod.Annotations[DeviceAnnotationKey] = deviceID
pod.Annotations[stconst.DeviceAnnotationKey] = deviceID
_, err = d.client.CoreV1().Pods(d.namespace).Update(context.TODO(), pod, metav1.UpdateOptions{})
if err != nil {

View File

@ -5,6 +5,7 @@ import (
"fmt"
"git.davepedu.com/dave/syncthingcsi/pkg/driverutil"
"git.davepedu.com/dave/syncthingcsi/pkg/stapi"
"git.davepedu.com/dave/syncthingcsi/pkg/stconst"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@ -41,9 +42,9 @@ func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRe
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("bad volume capabilities: %s", capErr))
}
stPath := filepath.Join(VolumeDataDir, req.VolumeId)
hostStPath := filepath.Join(HostContainerBase, stPath)
hostTargetPath := filepath.Join(HostContainerBase, req.StagingTargetPath)
stPath := filepath.Join(stconst.VolumeDataDir, req.VolumeId)
hostStPath := filepath.Join(stconst.HostContainerBase, stPath)
hostTargetPath := filepath.Join(stconst.HostContainerBase, req.StagingTargetPath)
// get client to local node
api := driverutil.StApiFromLocalPod(d.syncthingApiKey)
@ -101,7 +102,7 @@ func (d *Driver) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolu
panic("StagingTargetPath is nil, there is a configuration error")
}
hostStagePath := filepath.Join(HostContainerBase, req.StagingTargetPath)
hostStagePath := filepath.Join(stconst.HostContainerBase, req.StagingTargetPath)
if pathExists(hostStagePath) {
// bind mount StagingTargetPath to TargetPath, where k8s wants it for the pod
isMounted, err := isMountpoint(hostStagePath)
@ -149,8 +150,8 @@ func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolu
}
// Make the TargetPath dir
hostTargetPath := filepath.Join(HostContainerBase, req.TargetPath)
hostStagePath := filepath.Join(HostContainerBase, req.StagingTargetPath)
hostTargetPath := filepath.Join(stconst.HostContainerBase, req.TargetPath)
hostStagePath := filepath.Join(stconst.HostContainerBase, req.StagingTargetPath)
if !pathExists(hostTargetPath) {
err := os.MkdirAll(hostTargetPath, 0644)
if err != nil {
@ -186,7 +187,7 @@ func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolu
func (d *Driver) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
klog.V(4).Infof("NodeUnpublishVolume: called with args %+v", req)
hostTargetPath := filepath.Join(HostContainerBase, req.TargetPath)
hostTargetPath := filepath.Join(stconst.HostContainerBase, req.TargetPath)
isMounted, err := isMountpoint(hostTargetPath)
if err != nil {
return nil, fmt.Errorf("couldn't check mounts: %s", err)

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"git.davepedu.com/dave/syncthingcsi/pkg/driver/nodeapi"
"git.davepedu.com/dave/syncthingcsi/pkg/stconst"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
"k8s.io/klog/v2"
@ -34,8 +35,8 @@ func (d *Driver) runNodeApi(opts []grpc.ServerOption) {
// track sync progress
func (d *Driver) InitializeFolder(ctx context.Context, folder *nodeapi.Folder) (*emptypb.Empty, error) {
// verify the folder exists locally
stPath := filepath.Join(VolumeDataDir, *folder.Name)
stFolderPath := filepath.Join(HostContainerBase, stPath)
stPath := filepath.Join(stconst.VolumeDataDir, *folder.Name)
stFolderPath := filepath.Join(stconst.HostContainerBase, stPath)
if !pathExists(stFolderPath) {
return nil, fmt.Errorf("folder path doesn't exist")

View File

@ -3,6 +3,7 @@ package driver
import (
"context"
"fmt"
"git.davepedu.com/dave/syncthingcsi/pkg/stconst"
"github.com/jrhouston/k8slock"
"sync"
"time"
@ -39,7 +40,7 @@ func (n *NodeLocker) getLocker(nodeID string) (*k8slock.Locker, error) {
if !ok {
var err error
lock, err = k8slock.NewLocker(
SyncthingLeaseLockName,
stconst.SyncthingLeaseLockName,
k8slock.TTL(60*time.Second), // TODO use a shorter time and a wrapper around this lock library that continuously updates the lease
k8slock.InClusterConfig(),
k8slock.Namespace(n.namespace),

View File

@ -6,6 +6,7 @@ import (
"git.davepedu.com/dave/stringset"
"git.davepedu.com/dave/syncthingcsi/pkg/driverutil"
"git.davepedu.com/dave/syncthingcsi/pkg/stapi"
"git.davepedu.com/dave/syncthingcsi/pkg/stconst"
"k8s.io/klog/v2"
"path/filepath"
"time"
@ -155,7 +156,7 @@ func (d *Driver) runAcceptOfferedFolders(allOtherDeviceIds *stringset.StringSet,
_, err := api.PutConfigFolder(stapi.Folder{
ID: string(folderID),
Label: string(folderID),
Path: filepath.Join(SyncthingVolumesRoot, string(folderID)),
Path: filepath.Join(stconst.SyncthingVolumesRoot, string(folderID)),
Paused: false,
Devices: devices,
})

View File

@ -4,19 +4,16 @@ import (
"context"
"fmt"
"git.davepedu.com/dave/syncthingcsi/pkg/stapi"
"git.davepedu.com/dave/syncthingcsi/pkg/stconst"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"os"
"strings"
)
// this file contains functions for interacting with synchting pods
const ExperimentNodeLabelSelector string = "syncthing-experiment=yes" // TODO
const StorageNodeLabelSelector string = "syncthing-storage=yes"
const SyncthingPodLabelSelector string = "role=syncthing"
// this file contains functions for interacting with synchthing pods
// PodMap is a map of node name => syncthing daemonset pod
type PodMap map[string]*corev1.Pod
@ -31,7 +28,7 @@ func GetSyncthingPods(client *kubernetes.Clientset, namespace string) (PodMap, e
var token string
for {
page, err := cv1.Nodes().List(context.TODO(), metav1.ListOptions{
LabelSelector: ExperimentNodeLabelSelector,
LabelSelector: stconst.NodeLabelSelector,
Continue: token,
})
if err != nil {
@ -52,7 +49,7 @@ func GetSyncthingPods(client *kubernetes.Clientset, namespace string) (PodMap, e
// GetSyncthingStoragePods returns syncthing daemonset pods that are running on nodes labeled storage nodes
func GetSyncthingStoragePods(client *kubernetes.Clientset, namespace string) (PodMap, error) {
// TODO remove the selector that constrains this to our test nodes
selector := ExperimentNodeLabelSelector + "," + StorageNodeLabelSelector
selector := stconst.StorageNodeLabelSelector
cv1 := client.CoreV1()
nodes := make([]*corev1.Node, 0)
token := ""
@ -87,7 +84,7 @@ func getSyncthingPodsForNodes(client *kubernetes.Clientset, namespace string, no
var token string
for {
page, err := cv1.Pods(namespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: SyncthingPodLabelSelector,
LabelSelector: stconst.SyncthingPodLabelSelector,
Continue: token,
})
if err != nil {
@ -171,14 +168,25 @@ func IsStorageNode(client *kubernetes.Clientset, nodeID string) (bool, error) {
return false, fmt.Errorf("couldn't inspect node: %s", err)
}
split := strings.Split(StorageNodeLabelSelector, "=")
storageKey := split[0]
storageValue := split[1]
labels := make(map[string]string)
value, ok := node.ObjectMeta.Labels[storageKey]
if ok && value == storageValue {
return true, nil
parsedLabels := strings.Split(stconst.StorageNodeLabelSelector, ",")
for _, label := range parsedLabels {
split := strings.Split(label, "=")
labels[split[0]] = split[1]
}
return false, nil
klog.Warningf("parsedLabels: %v", parsedLabels) // TODO testing remove me
return nodeHasLabels(node, labels), nil
}
func nodeHasLabels(node *corev1.Node, labels map[string]string) bool {
for key, labelValue := range labels {
value, ok := node.ObjectMeta.Labels[key]
if !ok || labelValue != value {
return false
}
}
return true
}

View File

@ -1,10 +1,40 @@
package driver
package stconst
import (
"k8s.io/klog/v2"
"os"
"path/filepath"
)
const driverVersion = "0.0.1"
var driverName string = os.Getenv("STCSI_DRIVER_NAME")
var DeviceAnnotationKey = driverName + "/syncthing-device-id"
var NodeLabelSelector string = os.Getenv("STCSI_NODE_LABEL") // "syncthing-experiment=yes"
var CsiLabelPrefix = driverName + "/"
var StorageNodeLabelSelector string // syncthing.csi.davepedu.com/syncthing-storage=yes
var SyncthingPodLabelSelector string // syncthing.csi.davepedu.com/role=syncthing"
func init() {
if driverName == "" { // TODO we could validate it too
klog.Fatal("must set STCSI_DRIVER_NAME")
}
var labelPrefix string
if NodeLabelSelector != "" { // TODO validate
labelPrefix = NodeLabelSelector + ","
}
storageNodeLabel := CsiLabelPrefix + "syncthing-storage=yes"
podLabel := CsiLabelPrefix + "role=syncthing"
StorageNodeLabelSelector = labelPrefix + storageNodeLabel
SyncthingPodLabelSelector = labelPrefix + podLabel
SyncthingLeaseLockName = driverName + "-syncthing-csi-lock"
}
// HostContainerBase is the path within the container where the host filesystem is mounted
const HostContainerBase string = "/host"
@ -39,12 +69,6 @@ var HostDeletedVolumeDataDir string = filepath.Join(HostContainerBase, DeletedVo
// SyncthingVolumesRoot is the path inside the Syncthing container where volume folders are
var SyncthingVolumesRoot string = "/data/volumes"
var SyncthingLeaseLockName string = os.Getenv("STCSI_LOCKNAME")
var SyncthingLeaseLockName string
var KubernetesPodName string = os.Getenv("KUBERNETES_POD_NAME")
func init() {
if SyncthingLeaseLockName == "" {
SyncthingLeaseLockName = "syncthing-csi-lock"
}
}