payment orchestrator build
All checks were successful
ci/woodpecker/push/db Pipeline was successful
ci/woodpecker/push/billing_fees Pipeline was successful
ci/woodpecker/push/fx/1 Pipeline was successful
ci/woodpecker/push/fx/2 Pipeline was successful
ci/woodpecker/push/nats Pipeline was successful
ci/woodpecker/push/ledger Pipeline was successful
ci/woodpecker/push/payments_orchestrator Pipeline was successful

This commit is contained in:
Stephan D
2025-11-11 16:20:34 +01:00
parent 40a3460e6d
commit 6f7ea2bf98
15 changed files with 886 additions and 1 deletions

View File

@@ -0,0 +1,28 @@
package appversion
import (
"github.com/tech/sendico/pkg/version"
vf "github.com/tech/sendico/pkg/version/factory"
)
// Build information populated via ldflags.
var (
Version string
Revision string
Branch string
BuildUser string
BuildDate string
)
// Create returns a printer configured for the payment orchestrator service.
func Create() version.Printer {
vi := version.Info{
Program: "Sendico Payment Orchestrator Service",
Revision: Revision,
Branch: Branch,
BuildUser: BuildUser,
BuildDate: BuildDate,
Version: Version,
}
return vf.Create(&vi)
}

View File

@@ -0,0 +1,298 @@
package serverimp
import (
"context"
"crypto/tls"
"os"
"strings"
"time"
chainclient "github.com/tech/sendico/chain/gateway/client"
oracleclient "github.com/tech/sendico/fx/oracle/client"
ledgerclient "github.com/tech/sendico/ledger/client"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrator"
"github.com/tech/sendico/payments/orchestrator/storage"
mongostorage "github.com/tech/sendico/payments/orchestrator/storage/mongo"
"github.com/tech/sendico/pkg/api/routers"
"github.com/tech/sendico/pkg/db"
msg "github.com/tech/sendico/pkg/messaging"
"github.com/tech/sendico/pkg/mlogger"
feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1"
"github.com/tech/sendico/pkg/server/grpcapp"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"gopkg.in/yaml.v3"
)
type Imp struct {
logger mlogger.Logger
file string
debug bool
config *config
app *grpcapp.App[storage.Repository]
feesConn *grpc.ClientConn
ledgerClient ledgerclient.Client
gatewayClient chainclient.Client
oracleClient oracleclient.Client
}
type config struct {
*grpcapp.Config `yaml:",inline"`
Fees clientConfig `yaml:"fees"`
Ledger clientConfig `yaml:"ledger"`
Gateway clientConfig `yaml:"gateway"`
Oracle clientConfig `yaml:"oracle"`
}
type clientConfig struct {
Address string `yaml:"address"`
DialTimeoutSecs int `yaml:"dial_timeout_seconds"`
CallTimeoutSecs int `yaml:"call_timeout_seconds"`
InsecureTransport bool `yaml:"insecure"`
}
func (c clientConfig) address() string {
return strings.TrimSpace(c.Address)
}
func (c clientConfig) dialTimeout() time.Duration {
if c.DialTimeoutSecs <= 0 {
return 5 * time.Second
}
return time.Duration(c.DialTimeoutSecs) * time.Second
}
func (c clientConfig) callTimeout() time.Duration {
if c.CallTimeoutSecs <= 0 {
return 3 * time.Second
}
return time.Duration(c.CallTimeoutSecs) * time.Second
}
func Create(logger mlogger.Logger, file string, debug bool) (*Imp, error) {
return &Imp{
logger: logger.Named("server"),
file: file,
debug: debug,
}, nil
}
func (i *Imp) Shutdown() {
if i.app != nil {
timeout := 15 * time.Second
if i.config != nil && i.config.Runtime != nil {
timeout = i.config.Runtime.ShutdownTimeout()
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
i.app.Shutdown(ctx)
cancel()
}
if i.ledgerClient != nil {
_ = i.ledgerClient.Close()
}
if i.gatewayClient != nil {
_ = i.gatewayClient.Close()
}
if i.oracleClient != nil {
_ = i.oracleClient.Close()
}
if i.feesConn != nil {
_ = i.feesConn.Close()
}
}
func (i *Imp) Start() error {
cfg, err := i.loadConfig()
if err != nil {
return err
}
i.config = cfg
repoFactory := func(logger mlogger.Logger, conn *db.MongoConnection) (storage.Repository, error) {
return mongostorage.New(logger, conn)
}
feesClient, feesConn := i.initFeesClient(cfg.Fees)
if feesConn != nil {
i.feesConn = feesConn
}
ledgerClient := i.initLedgerClient(cfg.Ledger)
if ledgerClient != nil {
i.ledgerClient = ledgerClient
}
gatewayClient := i.initGatewayClient(cfg.Gateway)
if gatewayClient != nil {
i.gatewayClient = gatewayClient
}
oracleClient := i.initOracleClient(cfg.Oracle)
if oracleClient != nil {
i.oracleClient = oracleClient
}
serviceFactory := func(logger mlogger.Logger, repo storage.Repository, producer msg.Producer) (grpcapp.Service, error) {
opts := []orchestrator.Option{}
if feesClient != nil {
opts = append(opts, orchestrator.WithFeeEngine(feesClient, cfg.Fees.callTimeout()))
}
if ledgerClient != nil {
opts = append(opts, orchestrator.WithLedgerClient(ledgerClient))
}
if gatewayClient != nil {
opts = append(opts, orchestrator.WithChainGatewayClient(gatewayClient))
}
if oracleClient != nil {
opts = append(opts, orchestrator.WithOracleClient(oracleClient))
}
return orchestrator.NewService(logger, repo, opts...), nil
}
app, err := grpcapp.NewApp(i.logger, "payments_orchestrator", cfg.Config, i.debug, repoFactory, serviceFactory)
if err != nil {
return err
}
i.app = app
return i.app.Start()
}
func (i *Imp) initFeesClient(cfg clientConfig) (feesv1.FeeEngineClient, *grpc.ClientConn) {
addr := cfg.address()
if addr == "" {
return nil, nil
}
dialCtx, cancel := context.WithTimeout(context.Background(), cfg.dialTimeout())
defer cancel()
creds := credentials.NewTLS(&tls.Config{})
if cfg.InsecureTransport {
creds = insecure.NewCredentials()
}
conn, err := grpc.DialContext(dialCtx, addr, grpc.WithTransportCredentials(creds))
if err != nil {
i.logger.Warn("failed to connect to fees service", zap.String("address", addr), zap.Error(err))
return nil, nil
}
i.logger.Info("connected to fees service", zap.String("address", addr))
return feesv1.NewFeeEngineClient(conn), conn
}
func (i *Imp) initLedgerClient(cfg clientConfig) ledgerclient.Client {
addr := cfg.address()
if addr == "" {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), cfg.dialTimeout())
defer cancel()
client, err := ledgerclient.New(ctx, ledgerclient.Config{
Address: addr,
DialTimeout: cfg.dialTimeout(),
CallTimeout: cfg.callTimeout(),
Insecure: cfg.InsecureTransport,
})
if err != nil {
i.logger.Warn("failed to connect to ledger service", zap.String("address", addr), zap.Error(err))
return nil
}
i.logger.Info("connected to ledger service", zap.String("address", addr))
return client
}
func (i *Imp) initGatewayClient(cfg clientConfig) chainclient.Client {
addr := cfg.address()
if addr == "" {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), cfg.dialTimeout())
defer cancel()
client, err := chainclient.New(ctx, chainclient.Config{
Address: addr,
DialTimeout: cfg.dialTimeout(),
CallTimeout: cfg.callTimeout(),
Insecure: cfg.InsecureTransport,
})
if err != nil {
i.logger.Warn("failed to connect to chain gateway service", zap.String("address", addr), zap.Error(err))
return nil
}
i.logger.Info("connected to chain gateway service", zap.String("address", addr))
return client
}
func (i *Imp) initOracleClient(cfg clientConfig) oracleclient.Client {
addr := cfg.address()
if addr == "" {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), cfg.dialTimeout())
defer cancel()
client, err := oracleclient.New(ctx, oracleclient.Config{
Address: addr,
DialTimeout: cfg.dialTimeout(),
CallTimeout: cfg.callTimeout(),
Insecure: cfg.InsecureTransport,
})
if err != nil {
i.logger.Warn("failed to connect to oracle service", zap.String("address", addr), zap.Error(err))
return nil
}
i.logger.Info("connected to oracle service", zap.String("address", addr))
return client
}
func (i *Imp) loadConfig() (*config, error) {
data, err := os.ReadFile(i.file)
if err != nil {
i.logger.Error("Could not read configuration file", zap.String("config_file", i.file), zap.Error(err))
return nil, err
}
cfg := &config{Config: &grpcapp.Config{}}
if err := yaml.Unmarshal(data, cfg); err != nil {
i.logger.Error("Failed to parse configuration", zap.Error(err))
return nil, err
}
if cfg.Runtime == nil {
cfg.Runtime = &grpcapp.RuntimeConfig{ShutdownTimeoutSeconds: 15}
}
if cfg.GRPC == nil {
cfg.GRPC = &routers.GRPCConfig{
Network: "tcp",
Address: ":50062",
EnableReflection: true,
EnableHealth: true,
}
} else {
if strings.TrimSpace(cfg.GRPC.Address) == "" {
cfg.GRPC.Address = ":50062"
}
if strings.TrimSpace(cfg.GRPC.Network) == "" {
cfg.GRPC.Network = "tcp"
}
}
if cfg.Metrics == nil {
cfg.Metrics = &grpcapp.MetricsConfig{Address: ":9403"}
} else if strings.TrimSpace(cfg.Metrics.Address) == "" {
cfg.Metrics.Address = ":9403"
}
return cfg, nil
}

View File

@@ -0,0 +1,12 @@
package server
import (
serverimp "github.com/tech/sendico/payments/orchestrator/internal/server/internal"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/server"
)
// Create initialises the payment orchestrator server implementation.
func Create(logger mlogger.Logger, file string, debug bool) (server.Application, error) {
return serverimp.Create(logger, file, debug)
}

View File

@@ -3,12 +3,12 @@ package orchestrator
import (
"strings"
"github.com/shopspring/decimal"
oracleclient "github.com/tech/sendico/fx/oracle/client"
"github.com/tech/sendico/pkg/merrors"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
oraclev1 "github.com/tech/sendico/pkg/proto/oracle/v1"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"github.com/shopspring/decimal"
"google.golang.org/protobuf/proto"
feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1"