File

Use local files as Kitex’s service governance configuration center

Supported file type

json yaml

Install

go get github.com/kitex-contrib/config-file

Suite

Local file configuration center adapter, kitex converts the configuration in local files into governance feature configurations of kitex through WithSuite.

The usage can be divided into two steps:

  1. Create a file watcher (FileWatcher).
  2. Use WithSuite to introduce configuration monitoring.

Server

type FileConfigServerSuite struct {
	watcher monitor.ConfigMonitor
}

Function Signature:

func NewSuite(key string, watcher filewatcher.FileWatcher, opts ...utils.Option) *FileConfigServerSuite

Sample code(or visit here):

package main

import (
	"context"
	"encoding/json"
	"log"

	"github.com/cloudwego/kitex-examples/kitex_gen/api"
	"github.com/cloudwego/kitex-examples/kitex_gen/api/echo"
	"github.com/cloudwego/kitex/pkg/klog"
	"github.com/cloudwego/kitex/pkg/rpcinfo"
	kitexserver "github.com/cloudwego/kitex/server"
	"github.com/kitex-contrib/config-file/filewatcher"
	"github.com/kitex-contrib/config-file/parser"
	fileserver "github.com/kitex-contrib/config-file/server"
)

var _ api.Echo = &EchoImpl{}

const (
	filepath    = "kitex_server.json"
	key         = "ServiceName"
	serviceName = "ServiceName"
)

// EchoImpl implements the last service interface defined in the IDL.
type EchoImpl struct{}

// Echo implements the Echo interface.
func (s *EchoImpl) Echo(ctx context.Context, req *api.Request) (resp *api.Response, err error) {
	klog.Info("echo called")
	return &api.Response{Message: req.Message}, nil
}

// customed by user
type MyParser struct{}

// one example for custom parser
// if the type of server config is json or yaml,just using default parser
func (p *MyParser) Decode(kind parser.ConfigType, data []byte, config interface{}) error {
	return json.Unmarshal(data, config)
}

func main() {
	klog.SetLevel(klog.LevelDebug)

	// Create a filewatcher object.
	fw, err := filewatcher.NewFileWatcher(filepath)
	if err != nil {
		panic(err)
	}
	// Start monitoring file changes.
	if err = fw.StartWatching(); err != nil {
		panic(err)
	}
	defer fw.StopWatching()

	svr := echo.NewServer(
		new(EchoImpl),
		kitexserver.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: serviceName}),
		kitexserver.WithSuite(fileserver.NewSuite(key, fw)), // Add watcher
	)
	if err := svr.Run(); err != nil {
		log.Println("server stopped with error:", err)
	} else {
		log.Println("server stopped")
	}
}

Client

type FileConfigClientSuite struct {
	watcher monitor.ConfigMonitor
	service string
}

Function Signature:

func NewSuite(service, key string, watcher filewatcher.FileWatcher,opts ...utils.Option)*FileConfigClientSuite

Sample code(or visit here):

package main

import (
	"context"
	"encoding/json"
	"log"
	"os"
	"os/signal"
	"time"

	"github.com/cloudwego/kitex-examples/kitex_gen/api"
	"github.com/cloudwego/kitex-examples/kitex_gen/api/echo"
	kitexclient "github.com/cloudwego/kitex/client"
	"github.com/cloudwego/kitex/pkg/klog"
	fileclient "github.com/kitex-contrib/config-file/client"
	"github.com/kitex-contrib/config-file/filewatcher"
	"github.com/kitex-contrib/config-file/parser"
)

const (
	filepath    = "kitex_client.json"
	key         = "ClientName/ServiceName"
	serviceName = "ServiceName"
	clientName  = "ClientName"
)

// customed by user
type MyParser struct{}

// one example for custom parser
// if the type of client config is json or yaml,just using default parser
func (p *MyParser) Decode(kind parser.ConfigType, data []byte, config interface{}) error {
	return json.Unmarshal(data, config)
}

func main() {
	klog.SetLevel(klog.LevelDebug)

	// Create a file watcher object.
	fw, err := filewatcher.NewFileWatcher(filepath)
	if err != nil {
		panic(err)
	}
	// Start monitoring file changes.
	if err = fw.StartWatching(); err != nil {
		panic(err)
	}

	go func() {
		sig := make(chan os.Signal, 1)
		signal.Notify(sig, os.Interrupt, os.Kill)
		<-sig
		fw.StopWatching()
		os.Exit(1)
	}()

	client, err := echo.NewClient(
		serviceName,
		kitexclient.WithHostPorts("0.0.0.0:8888"),
		kitexclient.WithSuite(fileclient.NewSuite(serviceName, key, fw)),
	)
	if err != nil {
		log.Fatal(err)
	}

	for {
		req := &api.Request{Message: "my request"}
		resp, err := client.Echo(context.Background(), req)
		if err != nil {
			klog.Errorf("take request error: %v", err)
		} else {
			klog.Infof("receive response %v", resp)
		}
		time.Sleep(time.Second * 10)
	}
}

NewFileWatcher

Create a local file watcher

Function Signature:

func NewFileWatcher(filePath string) (FileWatcher, error)

Sample code:

package main

import "github.com/kitex-contrib/config-file/filewatcher"

func main() {
	// Create a filewatcher object.
	fw, err := filewatcher.NewFileWatcher(filepath)
	if err != nil {
		panic(err)
	}
	// Start file monitoring (should be started before importing the Suite).
	if err = fw.StartWatching(); err != nil {
		panic(err)
	}

    // Cancel watching when the program exits.
	go func() {
		sig := make(chan os.Signal, 1)
		signal.Notify(sig, os.Interrupt, os.Kill)
		<-sig
		fw.StopWatching()
		os.Exit(1)
	}()
}

In the server-side (Server), due to the characteristics of KitexServer, we only need to define defer fw.StopWatching()

Configuration

Custom Parser

Define a custom format parser and pass it in through option of NewSuite. The format supports json and yaml by default.

Interface Definition:

// ConfigParser the parser for config file.
type ConfigParser interface {
	Decode(kind ConfigType, data []byte, config interface{}) error
}

Sample:

Extend parsing YAML types.

// customed by user
type MyParser struct{}

// one example for custom parser
// if the type of client config is json or yaml,just using default parser
func (p *MyParser) Decode(kind parser.ConfigType, data []byte, config interface{}) error {
	return yaml.Unmarshal(data, config)
}

const YAML parser.ConfigType = "yaml"

func withParser(o *utils.Options) {
	o.Parser = &MyParser{}
	o.Params = &parser.ConfigParam{
		Type: YAML,
	}
}

// passed in with `NewSuite`

// server
svr := echo.NewServer(
		new(EchoImpl),
		kitexserver.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: serviceName}),
		kitexserver.WithSuite(fileserver.NewSuite(key, fw, withParser)), // add watcher
	)

// client
client, err := echo.NewClient(
		serviceName,
		kitexclient.WithHostPorts("0.0.0.0:8888"),
		kitexclient.WithSuite(fileclient.NewSuite(serviceName, key, fw, withParser)),
	)

Governance Policy

In subsequent examples, we set the service name to ServiceName and the client name to ClientName.

Rate Limit

Category=limit

Currently, current limiting only supports the server side, so ClientServiceName is empty.

JSON Schema

Variable Introduction
connection_limit Maximum concurrent connections
qps_limit Maximum request number every 100ms

Example:

{
    "ServiceName": {
        "limit": {
            "connection_limit": 300,
            "qps_limit": 200
        }
    }
}

Note:

  • The granularity of the current limit configuration is server global, regardless of client or method.
  • Not configured or value is 0 means not enabled.
  • connection_limit and qps_limit can be configured independently, e.g. connection_limit = 100, qps_limit = 0
  • Multiple different rate limiting strategies for multiple services can be written within a single JSON file. Simply use filewatch to monitor the same file and pass in different keys. As shown in the example, the key is ServiceName

Retry Policy

Category=retry

JSON Schema

Variable Introduction
type 0: failure_policy 1: backup_policy
failure_policy.backoff_policy Can only be set one of fixed none random

Example:

key is ClientName/ServiceName

{
    "ClientName/ServiceName": {
        "retry": {
            "*": {
                "enable": true,
                "type": 0,
                "failure_policy": {
                    "stop_policy": {
                        "max_retry_times": 3,
                        "max_duration_ms": 2000,
                        "cb_policy": {
                            "error_rate": 0.2
                        }
                    }
                }
            },
            "Echo": {
                "enable": true,
                "type": 1,
                "backup_policy": {
                    "retry_delay_ms": 200,
                    "stop_policy": {
                        "max_retry_times": 2,
                        "max_duration_ms": 1000,
                        "cb_policy": {
                            "error_rate": 0.3
                        }
                    }
                }
            }
        }
    }
}

Note: retry.Container has built-in support for specifying the default configuration using the * wildcard (see the getRetryer method for details).

RPC Timeout

Category=rpc_timeout

JSON Schema

Example:

key is ClientName/ServiceName

{
    "ClientName/ServiceName": {
        "timeout": {
            "*": {
                "conn_timeout_ms": 100,
                "rpc_timeout_ms": 2000
            },
            "Pay": {
                "conn_timeout_ms": 50,
                "rpc_timeout_ms": 1000
            }
        },
    }
}

Circuit Break

Category=circuit_break

JSON Schema

Variable Introduction
min_sample Minimum statistical sample number

The echo method uses the following configuration (0.3, 100) and other methods use the global default configuration (0.5, 200)

Example:

key is ClientName/ServiceName

{
    "ClientName/ServiceName": {
        "circuitbreaker": {
            "Echo": {
                "enable": true,
                "err_rate": 0.3,
                "min_sample": 100
            }
        },
    }
}

Note: The circuit breaker implementation of kitex does not currently support changing the global default configuration (see initServiceCB for details).

More Info

Refer to example for more usage.

Note

Client/Server Key

For client configuration, you should write all their configurations in the same pair of $UserServiceName/$ServerServiceName, for example

{
    "ClientName/ServiceName": {
        "timeout": {
            "*": {
                "conn_timeout_ms": 100,
                "rpc_timeout_ms": 2000
            },
            "Pay": {
                "conn_timeout_ms": 50,
                "rpc_timeout_ms": 1000
            }
        },
        "circuitbreaker": {
            "Echo": {
                "enable": true,
                "err_rate": 0.3,
                "min_sample": 100
            }
        },
        "retry": {
            "*": {
                "enable": true,
                "type": 0,
                "failure_policy": {
                    "stop_policy": {
                        "max_retry_times": 3,
                        "max_duration_ms": 2000,
                        "cb_policy": {
                            "error_rate": 0.2
                        }
                    }
                }
            },
            "Echo": {
                "enable": true,
                "type": 1,
                "backup_policy": {
                    "retry_delay_ms": 200,
                    "stop_policy": {
                        "max_retry_times": 2,
                        "max_duration_ms": 1000,
                        "cb_policy": {
                            "error_rate": 0.3
                        }
                    }
                }
            }
        }
    }
}

Compatibility

The project uses the new features of sync/atomic added in version 1.19, so the Go version must be >= 1.19


Last modified March 2, 2024 : docs: optimize loadbalance doc (#992) (351ce08)