Merge branch 'main' into SEND027

This commit is contained in:
2026-01-16 11:00:33 +00:00
49 changed files with 399 additions and 319 deletions

154
api/pkg/chain/asset.go Normal file
View File

@@ -0,0 +1,154 @@
package chain
import (
"strings"
"github.com/tech/sendico/pkg/merrors"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
)
func AssetString(asset *chainv1.Asset) string {
if asset == nil {
return ""
}
symbol := strings.ToUpper(strings.TrimSpace(asset.GetTokenSymbol()))
if symbol == "" {
return ""
}
suffix := assetSuffix(asset.GetChain())
if suffix == "" {
return symbol
}
return symbol + "-" + suffix
}
func ParseAsset(assetString, network, tokenSymbol, contractAddress string) (*chainv1.Asset, error) {
token := strings.TrimSpace(tokenSymbol)
net := strings.TrimSpace(network)
contract := strings.TrimSpace(contractAddress)
if token == "" {
token = TokenFromAssetString(assetString)
}
if net == "" {
net = NetworkFromAssetString(assetString)
}
if token == "" {
return nil, merrors.InvalidArgument("asset: token_symbol is required")
}
chain := NetworkFromString(net)
if chain == chainv1.ChainNetwork_CHAIN_NETWORK_UNSPECIFIED {
return nil, merrors.InvalidArgument("asset: network is required")
}
return &chainv1.Asset{
Chain: chain,
TokenSymbol: strings.ToUpper(token),
ContractAddress: strings.ToLower(contract),
}, nil
}
func TokenFromAssetString(asset string) string {
trimmed := strings.TrimSpace(asset)
if trimmed == "" {
return ""
}
if idx := strings.Index(trimmed, "-"); idx > 0 {
return trimmed[:idx]
}
return trimmed
}
func NetworkFromAssetString(asset string) string {
trimmed := strings.TrimSpace(asset)
if trimmed == "" {
return ""
}
idx := strings.Index(trimmed, "-")
if idx < 0 {
return ""
}
return strings.TrimSpace(trimmed[idx+1:])
}
func NetworkName(chain chainv1.ChainNetwork) string {
if chain == chainv1.ChainNetwork_CHAIN_NETWORK_UNSPECIFIED {
return ""
}
if name, ok := chainv1.ChainNetwork_name[int32(chain)]; ok {
return strings.TrimPrefix(name, "CHAIN_NETWORK_")
}
return ""
}
func NetworkAlias(chain chainv1.ChainNetwork) string {
switch chain {
case chainv1.ChainNetwork_CHAIN_NETWORK_ETHEREUM_MAINNET:
return "ETH"
case chainv1.ChainNetwork_CHAIN_NETWORK_TRON_MAINNET:
return "TRON"
case chainv1.ChainNetwork_CHAIN_NETWORK_TRON_NILE:
return "TRON_NILE"
case chainv1.ChainNetwork_CHAIN_NETWORK_ARBITRUM_ONE:
return "ARBITRUM"
default:
name := NetworkName(chain)
if name == "" {
fallback := strings.TrimPrefix(strings.TrimSpace(chain.String()), "CHAIN_NETWORK_")
return strings.ToUpper(fallback)
}
return strings.ToUpper(name)
}
}
func NetworkFromString(value string) chainv1.ChainNetwork {
normalized := normalizeNetworkString(value)
if normalized == "" {
return chainv1.ChainNetwork_CHAIN_NETWORK_UNSPECIFIED
}
if val, ok := chainv1.ChainNetwork_value[normalized]; ok {
return chainv1.ChainNetwork(val)
}
switch normalized {
case "ETH", "ETHEREUM", "ETH_MAINNET":
return chainv1.ChainNetwork_CHAIN_NETWORK_ETHEREUM_MAINNET
case "TRON":
return chainv1.ChainNetwork_CHAIN_NETWORK_TRON_MAINNET
case "TRON_NILE":
return chainv1.ChainNetwork_CHAIN_NETWORK_TRON_NILE
case "ARB", "ARBITRUM":
return chainv1.ChainNetwork_CHAIN_NETWORK_ARBITRUM_ONE
}
if !strings.HasPrefix(normalized, "CHAIN_NETWORK_") {
normalized = "CHAIN_NETWORK_" + normalized
}
if val, ok := chainv1.ChainNetwork_value[normalized]; ok {
return chainv1.ChainNetwork(val)
}
return chainv1.ChainNetwork_CHAIN_NETWORK_UNSPECIFIED
}
func normalizeNetworkString(value string) string {
trimmed := strings.TrimSpace(value)
if trimmed == "" {
return ""
}
normalized := strings.ToUpper(trimmed)
normalized = strings.ReplaceAll(normalized, " ", "_")
normalized = strings.ReplaceAll(normalized, "-", "_")
return normalized
}
func assetSuffix(chain chainv1.ChainNetwork) string {
switch chain {
case chainv1.ChainNetwork_CHAIN_NETWORK_ETHEREUM_MAINNET:
return "ETH"
case chainv1.ChainNetwork_CHAIN_NETWORK_ARBITRUM_ONE:
return "ARB"
case chainv1.ChainNetwork_CHAIN_NETWORK_TRON_MAINNET:
return "TRC20"
case chainv1.ChainNetwork_CHAIN_NETWORK_TRON_NILE:
return "TRC20"
default:
return ""
}
}

View File

@@ -0,0 +1,55 @@
package chain
import (
"testing"
"github.com/stretchr/testify/require"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
)
func TestNetworkName(t *testing.T) {
require.Equal(t, "TRON_MAINNET", NetworkName(chainv1.ChainNetwork_CHAIN_NETWORK_TRON_MAINNET))
require.Equal(t, "", NetworkName(chainv1.ChainNetwork_CHAIN_NETWORK_UNSPECIFIED))
}
func TestNetworkAlias(t *testing.T) {
require.Equal(t, "ETH", NetworkAlias(chainv1.ChainNetwork_CHAIN_NETWORK_ETHEREUM_MAINNET))
require.Equal(t, "TRON", NetworkAlias(chainv1.ChainNetwork_CHAIN_NETWORK_TRON_MAINNET))
require.Equal(t, "TRON_NILE", NetworkAlias(chainv1.ChainNetwork_CHAIN_NETWORK_TRON_NILE))
require.Equal(t, "ARBITRUM", NetworkAlias(chainv1.ChainNetwork_CHAIN_NETWORK_ARBITRUM_ONE))
require.Equal(t, "UNSPECIFIED", NetworkAlias(chainv1.ChainNetwork_CHAIN_NETWORK_UNSPECIFIED))
}
func TestNetworkFromString(t *testing.T) {
require.Equal(t, chainv1.ChainNetwork_CHAIN_NETWORK_TRON_MAINNET, NetworkFromString("TRON_MAINNET"))
require.Equal(t, chainv1.ChainNetwork_CHAIN_NETWORK_TRON_MAINNET, NetworkFromString("CHAIN_NETWORK_TRON_MAINNET"))
require.Equal(t, chainv1.ChainNetwork_CHAIN_NETWORK_TRON_MAINNET, NetworkFromString("tron mainnet"))
require.Equal(t, chainv1.ChainNetwork_CHAIN_NETWORK_TRON_MAINNET, NetworkFromString("tron-mainnet"))
require.Equal(t, chainv1.ChainNetwork_CHAIN_NETWORK_TRON_MAINNET, NetworkFromString("TRON"))
require.Equal(t, chainv1.ChainNetwork_CHAIN_NETWORK_ETHEREUM_MAINNET, NetworkFromString("ETH"))
require.Equal(t, chainv1.ChainNetwork_CHAIN_NETWORK_ARBITRUM_ONE, NetworkFromString("ARBITRUM"))
require.Equal(t, chainv1.ChainNetwork_CHAIN_NETWORK_UNSPECIFIED, NetworkFromString(""))
}
func TestAssetString(t *testing.T) {
asset := &chainv1.Asset{
Chain: chainv1.ChainNetwork_CHAIN_NETWORK_TRON_MAINNET,
TokenSymbol: "usdt",
}
require.Equal(t, "USDT-TRC20", AssetString(asset))
}
func TestParseAsset(t *testing.T) {
asset, err := ParseAsset("USDT-TRC20", "TRON_MAINNET", "", "")
require.NoError(t, err)
require.Equal(t, chainv1.ChainNetwork_CHAIN_NETWORK_TRON_MAINNET, asset.GetChain())
require.Equal(t, "USDT", asset.GetTokenSymbol())
require.Equal(t, "", asset.GetContractAddress())
asset, err = ParseAsset("USDT-TRC20", "CHAIN_NETWORK_TRON_MAINNET", "", "")
require.NoError(t, err)
require.Equal(t, chainv1.ChainNetwork_CHAIN_NETWORK_TRON_MAINNET, asset.GetChain())
_, err = ParseAsset("USDT-TRC20", "", "USDT", "")
require.Error(t, err)
}

View File

@@ -5,7 +5,7 @@ go 1.24.0
require (
github.com/casbin/casbin/v2 v2.135.0
github.com/casbin/mongodb-adapter/v3 v3.7.0
github.com/go-chi/chi/v5 v5.2.3
github.com/go-chi/chi/v5 v5.2.4
github.com/google/uuid v1.6.0
github.com/mattn/go-colorable v0.1.14
github.com/mitchellh/mapstructure v1.5.0
@@ -93,6 +93,6 @@ require (
golang.org/x/sys v0.40.0 // indirect
golang.org/x/text v0.33.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260112192933-99fd39fd28a9 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260114163908-3f89685c29c3 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

View File

@@ -43,8 +43,8 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/go-chi/chi/v5 v5.2.3 h1:WQIt9uxdsAbgIYgid+BpYc+liqQZGMHRaUwp0JUcvdE=
github.com/go-chi/chi/v5 v5.2.3/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops=
github.com/go-chi/chi/v5 v5.2.4 h1:WtFKPHwlywe8Srng8j2BhOD9312j9cGUxG1SP4V2cR4=
github.com/go-chi/chi/v5 v5.2.4/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
@@ -269,8 +269,8 @@ gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
google.golang.org/genproto/googleapis/api v0.0.0-20251029180050-ab9386a59fda h1:+2XxjfsAu6vqFxwGBRcHiMaDCuZiqXGDUDVWVtrFAnE=
google.golang.org/genproto/googleapis/api v0.0.0-20251029180050-ab9386a59fda/go.mod h1:fDMmzKV90WSg1NbozdqrE64fkuTv6mlq2zxo9ad+3yo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260112192933-99fd39fd28a9 h1:IY6/YYRrFUk0JPp0xOVctvFIVuRnjccihY5kxf5g0TE=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260112192933-99fd39fd28a9/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260114163908-3f89685c29c3 h1:C4WAdL+FbjnGlpp2S+HMVhBeCq2Lcib4xZqfPNF6OoQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260114163908-3f89685c29c3/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc=
google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=

View File

@@ -33,7 +33,7 @@ func (b *NatsBroker) Subscribe(event model.NotificationEvent) (<-chan me.Envelop
b.logger.Info("Subscribing to subject", zap.String("subject", subject))
// Create a bidirectional channel to send messages to
messageChan := make(chan me.Envelope)
messageChan := make(chan me.Envelope, b.bufferSize)
b.mu.Lock()
defer b.mu.Unlock()

View File

@@ -20,11 +20,12 @@ import (
type natsSubscriotions = map[string]*TopicSubscription
type NatsBroker struct {
nc *nats.Conn
js nats.JetStreamContext
logger *zap.Logger
topicSubs natsSubscriotions
mu sync.Mutex
nc *nats.Conn
js nats.JetStreamContext
logger *zap.Logger
topicSubs natsSubscriotions
mu sync.Mutex
bufferSize int
}
type envConfig struct {
@@ -32,6 +33,8 @@ type envConfig struct {
Port int
}
const defaultConsumerBufferSize = 1024
// loadEnv gathers and validates connection details from environment variables
// listed in the Settings struct. Invalid or missing values surface as a typed
// InvalidArgument error so callers can decide how to handle them.
@@ -83,6 +86,10 @@ func NewNatsBroker(logger mlogger.Logger, settings *nc.Settings) (*NatsBroker, e
var err error
var cfg *envConfig
var natsURL string
bufferSize := defaultConsumerBufferSize
if settings != nil && settings.BufferSize > 0 {
bufferSize = settings.BufferSize
}
if settings != nil && strings.TrimSpace(settings.URLEnv) != "" {
urlVal := strings.TrimSpace(os.Getenv(settings.URLEnv))
if urlVal != "" {
@@ -123,8 +130,9 @@ func NewNatsBroker(logger mlogger.Logger, settings *nc.Settings) (*NatsBroker, e
}
res := &NatsBroker{
logger: l.Named("nats"),
topicSubs: natsSubscriotions{},
logger: l.Named("nats"),
topicSubs: natsSubscriotions{},
bufferSize: bufferSize,
}
if res.nc, err = nats.Connect(natsURL, opts...); err != nil {

View File

@@ -9,4 +9,5 @@ type Settings struct {
NATSName string `mapstructure:"broker_name" yaml:"broker_name"`
MaxReconnects int `mapstructure:"max_reconnects" yaml:"max_reconnects"`
ReconnectWait int `mapstructure:"reconnect_wait" yaml:"reconnect_wait"`
BufferSize int `mapstructure:"buffer_size" yaml:"buffer_size"`
}