Decentralised Art Server
High-performance C++ backend that exposes HTML interface and a secure REST API for managing Performative Transactions entities
 
Loading...
Searching...
No Matches
events_runtime.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <atomic>
4#include <cstdint>
5#include <filesystem>
6#include <memory>
7#include <optional>
8#include <string>
9#include <vector>
10
11#include "native.h"
12#include <asio.hpp>
13
14#include "sqlite/wal_store.hpp"
15
16#include "events_feed.hpp"
17#include "sqlite_hot_store.hpp"
18
19namespace dcn::evm
20{
21 class EVM;
22}
23
24namespace dcn::events
25{
26 class RpcClient;
27
28 constexpr std::size_t DEFAULT_PROJECT_BATCH_SIZE = 256;
29
30 std::int64_t reorgLookbackStart(std::int64_t next_from_block, std::size_t reorg_window_blocks);
31
32
34 {
35 std::filesystem::path hot_db_path;
36 std::filesystem::path archive_root;
37
38 int chain_id = 1;
39 bool ingestion_enabled = false;
41 evm::EVM * local_evm = nullptr;
42 std::string rpc_url;
43 std::string registry_address;
44 std::optional<std::int64_t> start_block = std::nullopt;
45 unsigned int rpc_timeout_ms = 7000;
46 unsigned int poll_interval_ms = 5000;
47 unsigned int confirmations = 12;
48 unsigned int block_batch_size = 500;
49
50 std::size_t hot_window_days = 90;
51 std::size_t reorg_window_blocks = 2048;
52 std::int64_t outbox_retention_ms = 7LL * 24 * 60 * 60 * 1000;
53
54 unsigned int projector_interval_ms = 200;
55 unsigned int archive_interval_ms = 30 * 1000;
56 unsigned int wal_checkpoint_interval_ms = 15 * 1000;
57 std::string chain_namespace;
58 };
59
61 {
62 public:
63 EventRuntime(asio::io_context & io_context, EventRuntimeConfig config);
65
66 EventRuntime(const EventRuntime &) = delete;
67 EventRuntime & operator=(const EventRuntime &) = delete;
68
71
72 void start();
73 void requestStop();
74 asio::awaitable<void> stop();
75 bool running() const;
76 bool ingestionEnabled() const;
78 std::uint64_t rpcTransportCallCount() const;
79
80 FeedPage getFeedPage(const FeedQuery & query) const override;
81 StreamPage getStreamPage(const StreamQuery & query) const override;
82 std::int64_t minAvailableStreamSeq() const override;
83
84 asio::awaitable<storage::sqlite::WalCheckpointStats> checkpointWal(storage::sqlite::WalCheckpointMode mode) const override;
85
86 private:
87 asio::awaitable<void> _sleepFor(const std::uint64_t ms) const;
88 nlohmann::json _rpcCall(const std::string & method, nlohmann::json params) const;
89
90 asio::awaitable<std::optional<std::int64_t>> _storeLoadNextFromBlock(int chain_id) const;
91 asio::awaitable<std::optional<std::uint64_t>> _storeLoadNextLocalSeq(int chain_id) const;
92 asio::awaitable<bool> _storeSaveNextLocalSeq(int chain_id, std::uint64_t next_seq, std::int64_t now_ms) const;
93 asio::awaitable<std::vector<std::int64_t>> _storeLoadReorgWindowBlocks(
94 int chain_id,
95 std::int64_t from_block,
96 std::int64_t to_block) const;
97 asio::awaitable<bool> _storeIngestBatch(
98 int chain_id,
99 std::vector<RawChainLog> raw_events,
100 std::vector<DecodedEvent> decoded_events,
101 std::vector<ChainBlockInfo> block_infos,
102 std::int64_t next_from_block,
103 std::int64_t now_ms,
104 std::optional<std::uint64_t> next_local_seq = std::nullopt) const;
105 asio::awaitable<bool> _storeApplyFinality(
106 int chain_id,
107 FinalityHeights heights,
108 std::int64_t now_ms,
109 std::size_t reorg_window_blocks) const;
110 asio::awaitable<std::size_t> _storeProjectBatch(std::size_t limit, std::int64_t now_ms) const;
111 asio::awaitable<bool> _storeRunArchiveCycle(int chain_id, std::size_t hot_window_days, std::int64_t now_ms) const;
112
113 asio::awaitable<FinalityHeights> _resolveFinality(const std::int64_t head) const;
114
115 asio::awaitable<void> _runLocalIngestionLoop();
116 asio::awaitable<void> _runIngestionLoop();
117 asio::awaitable<void> _runProjectorLoop();
118 asio::awaitable<void> _runArchiveLoop();
119 asio::awaitable<void> _runMaintenanceLoop();
120 asio::awaitable<void> _waitForLoops();
121
122 private:
123 asio::io_context & _io_context;
124 EventRuntimeConfig _config;
125 asio::strand<asio::io_context::executor_type> _write_strand;
126 std::shared_ptr<RpcClient> _rpc_client;
127
128 std::shared_ptr<SQLiteHotStore> _store;
129 std::unique_ptr<IEventDecoder> _decoder;
130
131 std::atomic<bool> _stop_requested{false};
132 std::atomic<bool> _running{false};
133
134 mutable std::atomic<bool> _blocking_transport_on_hot_write_strand{false};
135
136 std::atomic<std::size_t> _active_loop_count{0};
137 };
138}
Definition events_runtime.hpp:61
EventRuntime(EventRuntime &&)=delete
asio::awaitable< storage::sqlite::WalCheckpointStats > checkpointWal(storage::sqlite::WalCheckpointMode mode) const override
Definition events_runtime.cpp:369
void requestStop()
Definition events_runtime.cpp:191
void start()
Definition events_runtime.cpp:152
std::uint64_t rpcTransportCallCount() const
Definition events_runtime.cpp:238
bool running() const
Definition events_runtime.cpp:223
bool blockingTransportObservedOnHotWriteStrand() const
Definition events_runtime.cpp:233
EventRuntime & operator=(const EventRuntime &)=delete
std::int64_t minAvailableStreamSeq() const override
Definition events_runtime.cpp:253
asio::awaitable< void > stop()
Definition events_runtime.cpp:201
EventRuntime(const EventRuntime &)=delete
EventRuntime & operator=(EventRuntime &&)=delete
bool ingestionEnabled() const
Definition events_runtime.cpp:228
~EventRuntime()
Definition events_runtime.cpp:141
FeedPage getFeedPage(const FeedQuery &query) const override
Definition events_runtime.cpp:243
StreamPage getStreamPage(const StreamQuery &query) const override
Definition events_runtime.cpp:248
Definition events_feed.hpp:91
Definition evm.hpp:45
Definition wal_store.hpp:10
Definition decoded_event.hpp:11
constexpr std::size_t DEFAULT_PROJECT_BATCH_SIZE
Definition events_runtime.hpp:28
std::int64_t reorgLookbackStart(std::int64_t next_from_block, std::size_t reorg_window_blocks)
Definition events_runtime.cpp:31
Definition events_runtime.hpp:20
WalCheckpointMode
Definition wal.hpp:9
Definition events_runtime.hpp:34
unsigned int archive_interval_ms
Definition events_runtime.hpp:55
unsigned int wal_checkpoint_interval_ms
Definition events_runtime.hpp:56
unsigned int confirmations
Definition events_runtime.hpp:47
evm::EVM * local_evm
Definition events_runtime.hpp:41
std::string chain_namespace
Definition events_runtime.hpp:57
bool ingestion_enabled
Definition events_runtime.hpp:39
std::filesystem::path hot_db_path
Definition events_runtime.hpp:35
unsigned int block_batch_size
Definition events_runtime.hpp:48
std::size_t reorg_window_blocks
Definition events_runtime.hpp:51
unsigned int projector_interval_ms
Definition events_runtime.hpp:54
std::string registry_address
Definition events_runtime.hpp:43
std::optional< std::int64_t > start_block
Definition events_runtime.hpp:44
bool use_local_evm_source
Definition events_runtime.hpp:40
std::filesystem::path archive_root
Definition events_runtime.hpp:36
unsigned int rpc_timeout_ms
Definition events_runtime.hpp:45
std::int64_t outbox_retention_ms
Definition events_runtime.hpp:52
unsigned int poll_interval_ms
Definition events_runtime.hpp:46
std::string rpc_url
Definition events_runtime.hpp:42
int chain_id
Definition events_runtime.hpp:38
std::size_t hot_window_days
Definition events_runtime.hpp:50
Definition events_feed.hpp:57
Definition events_feed.hpp:49
Definition events_ingest.hpp:12
Definition events_feed.hpp:81
Definition events_feed.hpp:64