본문 바로가기
golang

[golang] kubernetes Operator 만들기 Kubebuilder Controller-runtime 6 - Controller

by weq155 2023. 11. 16.
반응형
반응형

 

[golang] kubernetes Operator 만들기 Controller-runtime 1 - Controller

[golang] kubernetes Operator 만들기 Controller-runtime 2 - Overview

[golang] kubernetes Operator 만들기 Controller-runtime 3 - Manager

[golang] kubernetes Operator 만들기 Controller-runtime 4 - Builder

[golang] kubernetes Operator 만들기 Controller-runtime 5 - Reconciler

[golang] kubernetes Operator 만들기 Controller-runtime 6 - Controller

Controller란?

Controller 컴포넌트는 Reconciler안에 Reconcile function과 관련된 타겟 리소스를

Kubernetes API 서버 통해 모니터링한다.

 

프로세스는 아래의 작동 순서로 동작한다.

  1. Kubernetes API에서 타겟 오브젝트의 변경을 감지한다.
  2. Reconciler가 필요할 때 Reconcile 함수를 호출한다.
  3. Reconcile function 내에서 Reconciler는 타겟 오브젝트의 최신 상태를 검색하고 필요하다면 업데이트한다.

Controller를 초기화 하기 위해서는 타겟리소스와 함께 Reconciler가 제공되어야한다.

controller-runtime의 Builder 컴포넌트에 의해 초기화 프로세스를 진행한다.

 

Builder 컴포넌트가 디자인된 Controller를 초기화 할때 제공된 매니저와 함께 Controller가 등록된다.

 

마지막으로 Manager의 Start 함수가 트리거되면 등록된 컨트롤러가 시작된다.

 

위의 순서가 Controller가 초기화되고 실행되는 프로세스이다.

이제 Controller 컴포넌트에 대해 자세하게 알아보자

 

Interface

Controller interface:

// Controller implements a Kubernetes API.  A Controller manages a work queue fed reconcile.Requests
// from source.Sources.  Work is performed through the reconcile.Reconciler for each enqueued item.
// Work typically is reads and writes Kubernetes objects to make the system state match the state specified
// in the object Spec.
type Controller interface {
 // Reconciler is called to reconcile an object by Namespace/Name
 reconcile.Reconciler

 // Watch takes events provided by a Source and uses the EventHandler to
 // enqueue reconcile.Requests in response to the events.
 //
 // Watch may be provided one or more Predicates to filter events before
 // they are given to the EventHandler.  Events will be passed to the
 // EventHandler if all provided Predicates evaluate to true.
 Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error

 // Start starts the controller.  Start blocks until the context is closed or a
 // controller has an error starting.
 Start(ctx context.Context) error

 // GetLogger returns this controller logger prefilled with basic information.
 GetLogger() logr.Logger
}

 

주의할 점은 struct 이름도 Controller로 같기 때문에 헷갈림

 

이제 구현으로 넘어가보자

 

Implementation

Controller 정의

type Controller struct {
 // Name is used to uniquely identify a Controller in tracing, logging and monitoring.  Name is required.
 Name string

 // MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
 MaxConcurrentReconciles int

 // Reconciler is a function that can be called at any time with the Name / Namespace of an object and
 // ensures that the state of the system matches the state specified in the object.
 // Defaults to the DefaultReconcileFunc.
 Do reconcile.Reconciler

 // MakeQueue constructs the queue for this controller once the controller is ready to start.
 // This exists because the standard Kubernetes workqueues start themselves immediately, which
 // leads to goroutine leaks if something calls controller.New repeatedly.
 MakeQueue func() workqueue.RateLimitingInterface

 // Queue is an listeningQueue that listens for events from Informers and adds object keys to
 // the Queue for processing
 Queue workqueue.RateLimitingInterface

 // SetFields is used to inject dependencies into other objects such as Sources, EventHandlers and Predicates
 // Deprecated: the caller should handle injected fields itself.
 SetFields func(i interface{}) error

 // mu is used to synchronize Controller setup
 mu sync.Mutex

 // Started is true if the Controller has been Started
 Started bool

 // ctx is the context that was passed to Start() and used when starting watches.
 //
 // According to the docs, contexts should not be stored in a struct: https://golang.org/pkg/context,
 // while we usually always strive to follow best practices, we consider this a legacy case and it should
 // undergo a major refactoring and redesign to allow for context to not be stored in a struct.
 ctx context.Context

 // CacheSyncTimeout refers to the time limit set on waiting for cache to sync
 // Defaults to 2 minutes if not set.
 CacheSyncTimeout time.Duration

 // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
 startWatches []watchDescription

 // LogConstructor is used to construct a logger to then log messages to users during reconciliation,
 // or for example when a watch is started.
 // Note: LogConstructor has to be able to handle nil requests as we are also using it
 // outside the context of a reconciliation.
 LogConstructor func(request *reconcile.Request) logr.Logger

 // RecoverPanic indicates whether the panic caused by reconcile should be recovered.
 RecoverPanic bool
}

 

 

struct안엔 여러 필드가 있고, 우리는 전체 필드를 다룰 수 없기 때문에,

중요한 몇개의 코어필드만 추려서 확인해보자

  1. Do(reconcile.Reconiler): Reconciler가 Controller를 지나가는 것은 이 필드에 저장됨 Controller는 Reconcile function을 보유하고 있고 Do.Reconcile을 호출한다.
  2. MakeQueue (func() workqueue.RateLimnitingInterface): workQueue를 만드는 함수
  3. Queue(workqueue.RateLimitingInterface) : 타겟 리소스의 이벤트를 큐에 전달한다

이제는 Controller의 전체적인 흐름을 살펴보자

어떻게 초기화되고 시작되는지 순차적으로 확인해보도록 하자

추가로 어떻게 Manager, Builder 그리고 Reconciler와 함께 동작하는지

  1. Manager와 Reconciler를 준비한다.
  2. NewControllerManagedBy(mgr)과 Complete(r)를 호출해서 Reconciler와 Manager를 Builder와 바인딩 한다. 이어서 Builder.build는 bldr.doController를 호출하여 Controller.New()와 함께 Controller를 생성한다.
  3. 만들어진 Controller는 mgr.Add(controller)를 통해 Manager에 등록되고 builder에서 bldr.ctrl로 설정된다.
  4. 또한 Builder.build는 For, Owns 및 Watch input에 대한 Kind(타겟 리소스의 이벤트 소스)와 EnqueueRequestForObject(소스에서 수신된 이벤트를 reconmise.Request 큐에 보내는 핸들러)를 초기화하는 bldr.doWatch를 호출하고 마지막으로 Controller’s Watch 기능을 호출한다.
  5. Manager.Start는 controller.Start를 호출하고 Queue를 생성하고 각 타겟 리소스(StartWatch)의 소스를 시작하며 Queue 구독을 시작해 변경 항목을 가져오고 Reconciler.Reconcile을 트리거 한다.

Watch func

Watch func은 타겟 리소스의 변경을 모니터링해 이벤트를 Queue 대기열에 넣는다.

Watch function은 Builder의 bldr.doWatch로 부터 For, Owns, Watches를 호출한다.

// Watch implements controller.Controller.
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
 c.mu.Lock()
 defer c.mu.Unlock()

 // Inject Cache into arguments
 if err := c.SetFields(src); err != nil {
  return err
 }
 if err := c.SetFields(evthdler); err != nil {
  return err
 }
 for _, pr := range prct {
  if err := c.SetFields(pr); err != nil {
   return err
  }
 }

 // Controller hasn't started yet, store the watches locally and return.
 //
 // These watches are going to be held on the controller struct until the manager or user calls Start(...).
 if !c.Started {
  c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
  return nil
 }

 c.LogConstructor(nil).Info("Starting EventSource", "source", src)
 return src.Start(c.ctx, evthdler, c.Queue, prct...)
}

 

그 다음 Watch function은 SetFields를 호출하여 공유된 캐시(Manager에 있는 클러스터가 보유한)를 Source, EventHandler 및 Drinkates에 주입한다.

 

Source의 종류는 여러 가지가 있지만 현재로서는 Kubernetes API를 통해 대상 리소스의 변경 사항을 검색하는 구성 요소라고 생각하면 됨.

 

EventHandler는 Source 구성 요소에서 수신된 이벤트를 Request로 변환하는 구성 요소로 간주할 수 있다.

 

Request는 Queue에 대기열로 처리되고 Reciliator.Reconcille에 대한 입력으로 사용

 

마지막으로 src.Start(c.ctx, evthdler, c.Queue, prct…)은 Kubernetes API를 모니터링하고 이벤트를 Queue에 넣는다.

 

 

Start func

Strat func은 Queue를 구독하고 각 queue에 항목에 Reconcile을 호출한다.

 

// Start implements controller.Controller.
func (c *Controller) Start(ctx context.Context) error {
 ...

 c.Queue = c.MakeQueue()
 ...

    // Run a worker thread that just dequeues items, processes them, and marks them done.
    // It enforces that the reconcileHandler is never invoked concurrently with the same object.
    for c.processNextWorkItem(ctx) {
    }

  c.Started = true
  ...
 return nil
}

 

 

the entire codes

 

Start function의 Queue는 생성되고 processNextWorkItem가 for 문에서 계속 호출된다.

processNextWorkItem은 Queue에서 항목을 가져오고 reconcileHandler(ctx, obj)을 호출하고

마지막으로 c.Reconcile(ctx, req)를 호출한다.

func (c *Controller) processNextWorkItem(ctx context.Context) bool {
 obj, shutdown := c.Queue.Get()
 if shutdown {
  // Stop working
  return false
 }

 // We call Done here so the workqueue knows we have finished
 // processing this item. We also must remember to call Forget if we
 // do not want this work item being re-queued. For example, we do
 // not call Forget if a transient error occurs, instead the item is
 // put back on the workqueue and attempted again after a back-off
 // period.
 defer c.Queue.Done(obj)

 ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
 defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)

 c.reconcileHandler(ctx, obj)
 return true
}

 

 

Dependent components

앞에서 언급한 바와 같이 Controller 컴포넌트는 여러 디펜던시 컴포넌트에 의존해 작동한다.

 

 

Source:

Source 인터페이스는 Kubernetes 객체, Webhook callback 등에 대한 생성, 업데이트 및 삭제 작업과 같은 다양한 소스에서 이벤트를 가져오는 방법이다.. 그런 다음 이벤트는 EventHandler에 의해 처리된다.

type Source interface {
 // Start is internal and should be called only by the Controller to register an EventHandler with the Informer
 // to enqueue reconcile.Requests.
 Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}

 

Source 인터페이스는 EventHandler를 Informer에 등록하고 reconcile.Request들을 대기열에 등록을 Controller가 내부적으로 사용하는 Start 메소드를 제공한다.

 

이것은 Controller가 타겟 리소스를 감지하고 Reconcile 함수를 트리거할 수 있게 한다.

소스 인터페이스에는 Kind, kindWithCache, Informer, Channel 등 여러 구현이 있다.

예를 들어 bldr.doWatch 함수에서는 kind가 blder.doWatch에서 사용됩니다:

typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
if err != nil {
  return err
}
src := &source.Kind{Type: typeForSrc}

 

 

Source의 각 구현은 특정 사용 사례를 제공하며 컨트롤러가 지정된 Source의 이벤트를 효율적으로 모니터링하고 캡처할 수 있도록 합니다. 대부분의 경우 Source는 대상 Kubernetes 리소스의 변경 사항을 캡처하는 데 사용됩니다.

다른 게시물에서 이러한 소스 구현에 대한 자세한 내용을 알아보겠습니다.

 

대기열(workqueue.RateLimiting Interface)

워크큐(workqueue)라고도 하는 큐는 큐에 추가되는 항목을 레이트 제한하는 인터페이스로, 소스(Source)에서 감지하고 이벤트 핸들러(EventHandler)에서 큐에 포함되는 변경 이벤트의 흐름을 관리하는 데 중요한 역할을 합니다.

workqueue.RateLimitingInterface는 controller-runtime 패키지의 일부가 아니라 client-go 패키지의 일부입니다:

// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
 DelayingInterface

 // AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
 AddRateLimited(item interface{})

 // Forget indicates that an item is finished being retried.  Doesn't matter whether it's for perm failing
 // or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
 // still have to call `Done` on the queue.
 Forget(item interface{})

 // NumRequeues returns back how many times the item was requeued
 NumRequeues(item interface{}) int
}

 

Queue는 Queue에 항목을 추가하는 일정하고 통제된 비율을 유지하여 컨트롤러가 항목을 효율적으로 처리하고 처리할 수 있도록 보장합니다. RateLimiting Interface는 AddRateLimited, Forget, NumReques와 같은 방법을 제공하여 Queue의 항목을 효과적으로 관리할 수 있습니다.

 

컨트롤러의 컨텍스트에서 Queue는 Source에서 감지하고 EventHandler에서 대기열에 있는 변경 이벤트를 저장합니다. 이러한 이벤트는 reconcome.Request 항목으로 표시되며, 이 항목은 컨트롤러의 ProcessNextWorkItem 함수에 의해 소비됩니다.

 

 

EventHandler

EventHandler는 Create, Update, Delete 또는 Generic 이벤트와 같은 다양한 유형의 이벤트를 처리하고 이를 워크큐에 대기하는 인터페이스입니다. 각 이벤트의 특정 로직은 해당 구현에서 정의됩니다.

type EventHandler interface {
 // Create is called in response to an create event - e.g. Pod Creation.
 Create(event.CreateEvent, workqueue.RateLimitingInterface)

 // Update is called in response to an update event -  e.g. Pod Updated.
 Update(event.UpdateEvent, workqueue.RateLimitingInterface)

 // Delete is called in response to a delete event - e.g. Pod Deleted.
 Delete(event.DeleteEvent, workqueue.RateLimitingInterface)

 // Generic is called in response to an event of an unknown type or a synthetic event triggered as a cron or
 // external trigger request - e.g. reconcile Autoscaling, or a Webhook.
 Generic(event.GenericEvent, workqueue.RateLimitingInterface)
}

 

각 메서드의 구현은 EventHandler가 특정 이벤트를 처리하여 워크큐에 대기열로 만드는 방법을 정의합니다. 예를 들어 EnqueueRequestForObject는 수신된 이벤트를 reconcome.Request 형식으로 Queue에 대기열로 대기열로 대기열로 대기열로 대기열에 대기열로 처리하는 구현 중 하나입니다.

 

 

Predicate

Predicate는 EventHandler가 처리하기 위해 이벤트를 대기열에 올리기 전에 이벤트를 필터링하는 데 사용되는 인터페이스입니다.

// Predicate filters events before enqueuing the keys.
type Predicate interface {
 // Create returns true if the Create event should be processed
 Create(event.CreateEvent) bool

 // Delete returns true if the Delete event should be processed
 Delete(event.DeleteEvent) bool

 // Update returns true if the Update event should be processed
 Update(event.UpdateEvent) bool

 // Generic returns true if the Generic event should be processed
 Generic(event.GenericEvent) bool
}

Predicate 인터페이스는 Create, Delete, Update 및 Generic의 네 가지 메서드를 제공합니다. 이 메서드는 해당 이벤트를 처리해야 하는지 필터링해야 하는지 나타내는 부울 값을 반환합니다.

 

Predicate는 특정 기준에 따라 특정 이벤트를 선택적으로 처리하거나 무시하려는 경우에 유용합니다. 예를 들어, 술어를 사용하여 응용 프로그램의 논리와 관련 없는 이벤트를 필터링하거나 불필요한 이벤트 처리를 건너뛰어 리소스 사용량을 최적화할 수 있습니다.

 

이벤트에 대한 특정 필터 요구 사항이 없는 경우 Predicate를 비워 둘 수 있으며 EventHandler에서 모든 이벤트를 처리합니다.

 

 

Summary

이 게시물에서는 컨트롤러 실행 시간의 핵심 구성 요소 중 하나인 Controller 구성 요소에 대해 살펴보았습니다. 주요 내용은 다음과 같습니다:

  1. 컨트롤러 인터페이스는 컨트롤러와 조정자 간의 긴밀한 협업을 나타내는 조정자 인터페이스를 내장합니다.
  2. 컨트롤러는 Kubernetes API 서버에서 대상 리소스의 변경 사항을 모니터링하고 필요할 때 조정자에서 조정 기능을 호출합니다.
  3. 컨트롤러의 Watch 기능은 Kubernetes API를 통해 대상 개체의 변경 사항을 모니터링하고 감지하는 데 사용되며, 이후 처리를 위해 이벤트를 워크큐에 대기합니다.
  4. 컨트롤러의 시작 기능은 작업 대기열에 가입하고 대기열의 각 항목에 대해 조정 기능을 트리거하는 프로세스입니다.
  5. 컨트롤러의 전체 흐름은 Source → EventHandler → Queue → Controller.Reconcille → Controller.Do.Reconcille로 구성됩니다.

 

Reperence

* 본 글은  Masato Naka의 medium 게시글을 참조 & 번역함

https://nakamasato.medium.com/kubernetes-operator-series-6-controller-runtime-component-controller-604c8905a1e1

 

Kubernetes Operator series 6 — controller-runtime component — Controller

Explore the core components of Kubernetes Operator controller-runtime, understanding the Controller’s role, initialization, and interaction

nakamasato.medium.com

 

반응형