Skip to content

Commit

Permalink
Basic offer management without plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
thomash-acinq committed Mar 5, 2025
1 parent 4683eb0 commit 89ba629
Show file tree
Hide file tree
Showing 22 changed files with 760 additions and 91 deletions.
8 changes: 8 additions & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,14 @@ eclair {
// Frequency at which we clean our DB to remove peer storage from nodes with whom we don't have channels anymore.
cleanup-frequency = 1 day
}

managed-offers {
message-path-min-length = 2

payment-path-count = 2
payment-path-length = 4
payment-path-expiry-delta = 500
}
}

akka {
Expand Down
28 changes: 27 additions & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,17 @@ import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
import fr.acinq.eclair.db.{IncomingPayment, OutgoingPayment, OutgoingPaymentStatus}
import fr.acinq.eclair.io.Peer.{GetPeerInfo, OpenChannelResponse, PeerInfo}
import fr.acinq.eclair.io._
import fr.acinq.eclair.message.OnionMessages.{IntermediateNode, Recipient}
import fr.acinq.eclair.message.{OnionMessages, Postman}
import fr.acinq.eclair.payment._
import fr.acinq.eclair.payment.offer.{OfferCreator, OfferManager}
import fr.acinq.eclair.payment.receive.MultiPartHandler.ReceiveStandardPayment
import fr.acinq.eclair.payment.relay.Relayer.{ChannelBalance, GetOutgoingChannels, OutgoingChannels, RelayFees}
import fr.acinq.eclair.payment.send.PaymentInitiator._
import fr.acinq.eclair.payment.send.{ClearRecipient, OfferPayment, PaymentIdentifier}
import fr.acinq.eclair.router.Router
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.wire.protocol.OfferTypes.Offer
import fr.acinq.eclair.wire.protocol.OfferTypes.{Offer, OfferAbsoluteExpiry, OfferIssuer, OfferQuantityMax, OfferTlv}
import fr.acinq.eclair.wire.protocol._
import grizzled.slf4j.Logging
import scodec.bits.ByteVector
Expand Down Expand Up @@ -120,6 +122,12 @@ trait Eclair {

def receive(description: Either[String, ByteVector32], amount_opt: Option[MilliSatoshi], expire_opt: Option[Long], fallbackAddress_opt: Option[String], paymentPreimage_opt: Option[ByteVector32], privateChannelIds_opt: Option[List[ByteVector32]])(implicit timeout: Timeout): Future[Bolt11Invoice]

def createOffer(description_opt: Option[String], amount_opt: Option[MilliSatoshi], expiry_opt: Option[TimestampSecond], issuer_opt: Option[String], firstNodeId_opt: Option[PublicKey])(implicit timeout: Timeout): Future[Offer]

def disableOffer(offer: Offer)(implicit timeout: Timeout): Unit

def listOffers(onlyActive: Boolean = true)(implicit timeout: Timeout): Future[Seq[Offer]]

def newAddress(): Future[String]

def receivedInfo(paymentHash: ByteVector32)(implicit timeout: Timeout): Future[Option[IncomingPayment]]
Expand Down Expand Up @@ -370,6 +378,24 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {
}
}

override def createOffer(description_opt: Option[String], amount_opt: Option[MilliSatoshi], expiry_opt: Option[TimestampSecond], issuer_opt: Option[String], firstNodeId_opt: Option[PublicKey])(implicit timeout: Timeout): Future[Offer] = {
val offerCreator = appKit.system.spawnAnonymous(OfferCreator(appKit.nodeParams, appKit.router, appKit.offerManager, appKit.defaultOfferHandler))
offerCreator.ask[Either[String, Offer]](replyTo => OfferCreator.Create(replyTo, description_opt, amount_opt, expiry_opt, issuer_opt, firstNodeId_opt))
.flatMap {
case Left(errorMessage) => Future.failed(new Exception(errorMessage))
case Right(offer) => Future.successful(offer)
}
}

override def disableOffer(offer: Offer)(implicit timeout: Timeout): Unit = {
appKit.offerManager ! OfferManager.DisableOffer(offer)
appKit.nodeParams.db.managedOffers.disableOffer(offer)
}

override def listOffers(onlyActive: Boolean = true)(implicit timeout: Timeout): Future[Seq[Offer]] = Future {
appKit.nodeParams.db.managedOffers.listOffers(onlyActive).map(_.offer)
}

override def newAddress(): Future[String] = {
appKit.wallet match {
case w: BitcoinCoreClient => w.getReceiveAddress()
Expand Down
10 changes: 9 additions & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import fr.acinq.eclair.db._
import fr.acinq.eclair.io.MessageRelay.{RelayAll, RelayChannelsOnly, RelayPolicy}
import fr.acinq.eclair.io.{PeerConnection, PeerReadyNotifier}
import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig
import fr.acinq.eclair.payment.offer.OffersConfig
import fr.acinq.eclair.payment.relay.OnTheFlyFunding
import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams}
import fr.acinq.eclair.router.Announcements.AddressException
Expand Down Expand Up @@ -92,7 +93,8 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
liquidityAdsConfig: LiquidityAds.Config,
peerWakeUpConfig: PeerReadyNotifier.WakeUpConfig,
onTheFlyFundingConfig: OnTheFlyFunding.Config,
peerStorageConfig: PeerStorageConfig) {
peerStorageConfig: PeerStorageConfig,
offersConfig: OffersConfig) {
val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey

val nodeId: PublicKey = nodeKeyManager.nodeId
Expand Down Expand Up @@ -705,6 +707,12 @@ object NodeParams extends Logging {
writeDelay = FiniteDuration(config.getDuration("peer-storage.write-delay").getSeconds, TimeUnit.SECONDS),
removalDelay = FiniteDuration(config.getDuration("peer-storage.removal-delay").getSeconds, TimeUnit.SECONDS),
cleanUpFrequency = FiniteDuration(config.getDuration("peer-storage.cleanup-frequency").getSeconds, TimeUnit.SECONDS),
),
offersConfig = OffersConfig(
messagePathMinLength = config.getInt("managed-offers.message-path-min-length"),
paymentPathCount = config.getInt("managed-offers.payment-path-count"),
paymentPathLength = config.getInt("managed-offers.payment-path-length"),
paymentPathCltvExpiryDelta = CltvExpiryDelta(config.getInt("managed-offers.payment-path-expiry-delta")),
)
)
}
Expand Down
6 changes: 5 additions & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import fr.acinq.eclair.db.FileBackupHandler.FileBackupParams
import fr.acinq.eclair.db.{Databases, DbEventHandler, FileBackupHandler, PeerStorageCleaner}
import fr.acinq.eclair.io._
import fr.acinq.eclair.message.Postman
import fr.acinq.eclair.payment.offer.OfferManager
import fr.acinq.eclair.payment.offer.{DefaultHandler, OfferManager}
import fr.acinq.eclair.payment.receive.PaymentHandler
import fr.acinq.eclair.payment.relay.{AsyncPaymentTriggerer, PostRestartHtlcCleaner, Relayer}
import fr.acinq.eclair.payment.send.{Autoprobe, PaymentInitiator}
Expand Down Expand Up @@ -358,6 +358,8 @@ class Setup(val datadir: File,
dbEventHandler = system.actorOf(SimpleSupervisor.props(DbEventHandler.props(nodeParams), "db-event-handler", SupervisorStrategy.Resume))
register = system.actorOf(SimpleSupervisor.props(Register.props(), "register", SupervisorStrategy.Resume))
offerManager = system.spawn(Behaviors.supervise(OfferManager(nodeParams, paymentTimeout = 1 minute)).onFailure(typed.SupervisorStrategy.resume), name = "offer-manager")
defaultOfferHandler = system.spawn(Behaviors.supervise(DefaultHandler(nodeParams, router)).onFailure(typed.SupervisorStrategy.resume), name = "default-offer-handler")
_ = for (offer <- nodeParams.db.managedOffers.listOffers(onlyActive = true)) offerManager ! OfferManager.RegisterOffer(offer.offer, None, offer.pathId_opt, defaultOfferHandler)
paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler", SupervisorStrategy.Resume))
triggerer = system.spawn(Behaviors.supervise(AsyncPaymentTriggerer()).onFailure(typed.SupervisorStrategy.resume), name = "async-payment-triggerer")
peerReadyManager = system.spawn(Behaviors.supervise(PeerReadyManager()).onFailure(typed.SupervisorStrategy.restart), name = "peer-ready-manager")
Expand Down Expand Up @@ -399,6 +401,7 @@ class Setup(val datadir: File,
balanceActor = balanceActor,
postman = postman,
offerManager = offerManager,
defaultOfferHandler = defaultOfferHandler,
wallet = bitcoinClient)

zmqBlockTimeout = after(5 seconds, using = system.scheduler)(Future.failed(BitcoinZMQConnectionTimeoutException))
Expand Down Expand Up @@ -468,6 +471,7 @@ case class Kit(nodeParams: NodeParams,
balanceActor: typed.ActorRef[BalanceActor.Command],
postman: typed.ActorRef[Postman.Command],
offerManager: typed.ActorRef[OfferManager.Command],
defaultOfferHandler: typed.ActorRef[OfferManager.HandlerCommand],
wallet: OnChainWallet with OnchainPubkeyCache)

object Kit {
Expand Down
5 changes: 5 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ trait Databases {
def channels: ChannelsDb
def peers: PeersDb
def payments: PaymentsDb
def managedOffers: OffersDb
def pendingCommands: PendingCommandsDb
def liquidity: LiquidityDb
//@formatter:on
Expand All @@ -66,6 +67,7 @@ object Databases extends Logging {
channels: SqliteChannelsDb,
peers: SqlitePeersDb,
payments: SqlitePaymentsDb,
managedOffers: SqliteOffersDb,
pendingCommands: SqlitePendingCommandsDb,
private val backupConnection: Connection) extends Databases with FileBackup {
override def backup(backupFile: File): Unit = SqliteUtils.using(backupConnection.createStatement()) {
Expand All @@ -85,6 +87,7 @@ object Databases extends Logging {
channels = new SqliteChannelsDb(eclairJdbc),
peers = new SqlitePeersDb(eclairJdbc),
payments = new SqlitePaymentsDb(eclairJdbc),
managedOffers = new SqliteOffersDb(eclairJdbc),
pendingCommands = new SqlitePendingCommandsDb(eclairJdbc),
backupConnection = eclairJdbc
)
Expand All @@ -97,6 +100,7 @@ object Databases extends Logging {
channels: PgChannelsDb,
peers: PgPeersDb,
payments: PgPaymentsDb,
managedOffers: PgOffersDb,
pendingCommands: PgPendingCommandsDb,
dataSource: HikariDataSource,
lock: PgLock) extends Databases with ExclusiveLock {
Expand Down Expand Up @@ -157,6 +161,7 @@ object Databases extends Logging {
channels = new PgChannelsDb,
peers = new PgPeersDb,
payments = new PgPaymentsDb,
managedOffers = new PgOffersDb,
pendingCommands = new PgPendingCommandsDb,
dataSource = ds,
lock = lock)
Expand Down
26 changes: 26 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ case class DualDatabases(primary: Databases, secondary: Databases) extends Datab
override val channels: ChannelsDb = DualChannelsDb(primary.channels, secondary.channels)
override val peers: PeersDb = DualPeersDb(primary.peers, secondary.peers)
override val payments: PaymentsDb = DualPaymentsDb(primary.payments, secondary.payments)
override val managedOffers: OffersDb = DualOffersDb(primary.managedOffers, secondary.managedOffers)
override val pendingCommands: PendingCommandsDb = DualPendingCommandsDb(primary.pendingCommands, secondary.pendingCommands)
override val liquidity: LiquidityDb = DualLiquidityDb(primary.liquidity, secondary.liquidity)

Expand Down Expand Up @@ -405,6 +406,31 @@ case class DualPaymentsDb(primary: PaymentsDb, secondary: PaymentsDb) extends Pa
}
}

case class DualOffersDb(primary: OffersDb, secondary: OffersDb) extends OffersDb {

private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("db-offers").build()))

override def addOffer(offer: OfferTypes.Offer, pathId_opt: Option[ByteVector32]): Unit = {
runAsync(secondary.addOffer(offer, pathId_opt))
primary.addOffer(offer, pathId_opt)
}

override def disableOffer(offer: OfferTypes.Offer): Unit = {
runAsync(secondary.disableOffer(offer))
primary.disableOffer(offer)
}

override def enableOffer(offer: OfferTypes.Offer): Unit = {
runAsync(secondary.enableOffer(offer))
primary.enableOffer(offer)
}

override def listOffers(onlyActive: Boolean): Seq[OfferData] = {
runAsync(secondary.listOffers(onlyActive))
primary.listOffers(onlyActive)
}
}

case class DualPendingCommandsDb(primary: PendingCommandsDb, secondary: PendingCommandsDb) extends PendingCommandsDb {

private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("db-pending-commands").build()))
Expand Down
51 changes: 51 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/OffersDb.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2025 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair.db

import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.eclair.TimestampMilli
import fr.acinq.eclair.wire.protocol.OfferTypes.Offer

/**
* Database for offers fully managed by eclair, as opposed to offers managed by a plugin.
*/
trait OffersDb {
/**
* Add an offer managed by eclair.
* @param pathId_opt If the offer uses a blinded path, this is the corresponding pathId.
*/
def addOffer(offer: Offer, pathId_opt: Option[ByteVector32]): Unit

/**
* Disable an offer. The offer is still stored but new invoice requests and new payment attempts for already emitted
* invoices will be rejected.
*/
def disableOffer(offer: Offer): Unit

/**
* Activate an offer that was previously disabled.
*/
def enableOffer(offer: Offer): Unit

/**
* List offers managed by eclair.
* @param onlyActive Whether to return only active offers or also disabled ones.
*/
def listOffers(onlyActive: Boolean): Seq[OfferData]
}

case class OfferData(offer: Offer, pathId_opt: Option[ByteVector32], createdAt: TimestampMilli, isActive: Boolean)
113 changes: 113 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgOffersDb.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright 2025 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair.db.pg

import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.eclair.TimestampMilli
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
import fr.acinq.eclair.db.{OfferData, OffersDb}
import fr.acinq.eclair.db.pg.PgUtils.PgLock
import fr.acinq.eclair.wire.protocol.OfferTypes
import fr.acinq.eclair.wire.protocol.OfferTypes.Offer
import grizzled.slf4j.Logging

import java.sql.ResultSet
import javax.sql.DataSource

object PgOffersDb {
val DB_NAME = "offers"
val CURRENT_VERSION = 1
}

class PgOffersDb(implicit ds: DataSource, lock: PgLock) extends OffersDb with Logging {

import PgOffersDb._
import PgUtils.ExtendedResultSet._
import PgUtils._
import lock._

inTransaction { pg =>
using(pg.createStatement()) { statement =>
getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE SCHEMA offers")
statement.executeUpdate("CREATE TABLE offers.managed (offer_id TEXT NOT NULL PRIMARY KEY, offer TEXT NOT NULL, path_id TEXT, created_at TIMESTAMP WITH TIME ZONE NOT NULL, is_active BOOLEAN NOT NULL)")
statement.executeUpdate("CREATE INDEX offer_is_active_idx ON offers.managed(is_active)")
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
setVersion(statement, DB_NAME, CURRENT_VERSION)
}
}

override def addOffer(offer: OfferTypes.Offer, pathId_opt: Option[ByteVector32]): Unit = withMetrics("offers/add", DbBackends.Postgres){
withLock { pg =>
using(pg.prepareStatement("INSERT INTO offers.managed (offer_id, offer, path_id, created_at, is_active) VALUES (?, ?, ?, ?, TRUE)")) { statement =>
statement.setString(1, offer.offerId.toHex)
statement.setString(2, offer.toString)
pathId_opt match {
case Some(pathId) => statement.setString(3, pathId.toHex)
case None => statement.setNull(3, java.sql.Types.VARCHAR)
}
statement.setTimestamp(4, TimestampMilli.now().toSqlTimestamp)
statement.executeUpdate()
}
}
}

override def disableOffer(offer: OfferTypes.Offer): Unit = withMetrics("offers/disable", DbBackends.Postgres){
withLock { pg =>
using(pg.prepareStatement("UPDATE offers.managed SET is_active = FALSE WHERE offer_id = ?")) { statement =>
statement.setString(1, offer.offerId.toHex)
statement.executeUpdate()
}
}
}

override def enableOffer(offer: OfferTypes.Offer): Unit = withMetrics("offers/enable", DbBackends.Postgres){
withLock { pg =>
using(pg.prepareStatement("UPDATE offers.managed SET is_active = TRUE WHERE offer_id = ?")) { statement =>
statement.setString(1, offer.offerId.toHex)
statement.executeUpdate()
}
}
}

private def parseOfferData(rs: ResultSet): OfferData = {
OfferData(
Offer.decode(rs.getString("offer")).get,
rs.getStringNullable("path_id").map(ByteVector32.fromValidHex),
TimestampMilli.fromSqlTimestamp(rs.getTimestamp("created_at")),
rs.getBoolean("is_active")
)
}

override def listOffers(onlyActive: Boolean): Seq[OfferData] = withMetrics("offers/list", DbBackends.Postgres){
withLock { pg =>
if (onlyActive) {
using(pg.prepareStatement("SELECT * FROM offers.managed WHERE is_active = TRUE")) { statement =>
statement.executeQuery().map(parseOfferData).toSeq
}
} else {
using(pg.prepareStatement("SELECT * FROM offers.managed")) { statement =>
statement.executeQuery().map(parseOfferData).toSeq
}
}
}
}
}
Loading

0 comments on commit 89ba629

Please sign in to comment.