Skip to content

Commit 28ba19c

Browse files
socutessocutes
andauthored
refactor: Improve multi-tenancy and auto topic rewrite processing (robustmq#1799)
* dev Signed-off-by: socutes <socutes@gmail.com> * dev Signed-off-by: socutes <socutes@gmail.com> * dev Signed-off-by: socutes <socutes@gmail.com> * dev Signed-off-by: socutes <socutes@gmail.com> * dev Signed-off-by: socutes <socutes@gmail.com> * dev Signed-off-by: socutes <socutes@gmail.com> * dev Signed-off-by: socutes <socutes@gmail.com> --------- Signed-off-by: socutes <socutes@gmail.com> Co-authored-by: socutes <socutes@gmail.com>
1 parent f301343 commit 28ba19c

89 files changed

Lines changed: 990 additions & 536 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

config/logger.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
[stdout]
1616
kind = "console"
1717
targets = [
18-
{ path = "", level = "info" },
18+
{ path = "", level = "warn" },
1919
{ path = "openraft::raft::responder", level = "error" },
2020
]
2121

docs/zh/Blogs/52.md

Lines changed: 41 additions & 65 deletions
Large diffs are not rendered by default.

src/admin-server/src/mqtt/connector.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ pub struct CreateConnectorReq {
9292

9393
pub failure_strategy: FailureStrategy,
9494

95+
#[validate(length(min = 1, max = 256, message = "Tenant length must be between 1-256"))]
96+
pub tenant: String,
97+
9598
#[validate(length(
9699
min = 1,
97100
max = 256,
@@ -323,7 +326,8 @@ async fn connector_create_inner(
323326
let connector = MQTTConnector {
324327
connector_name: params.connector_name.clone(),
325328
connector_type,
326-
failure_strategy: parse_failure_strategy(params.failure_strategy),
329+
failure_strategy: parse_failure_strategy(&params.tenant, params.failure_strategy),
330+
tenant: params.tenant.clone(),
327331
topic_name: params.topic_name.clone(),
328332
status: MQTTStatus::Idle,
329333
etl_rule: ETLRule::default(),
@@ -427,7 +431,7 @@ fn parse_connector_type(type_str: &str, config: &str) -> Result<ConnectorType, C
427431
Ok(connector_type)
428432
}
429433

430-
fn parse_failure_strategy(strategy: FailureStrategy) -> FailureHandlingStrategy {
434+
fn parse_failure_strategy(tenant: &str, strategy: FailureStrategy) -> FailureHandlingStrategy {
431435
use metadata_struct::connector::{DeadMessageQueueStrategy, DiscardAfterRetryStrategy};
432436

433437
match strategy.strategy.to_lowercase().as_str() {
@@ -447,6 +451,7 @@ fn parse_failure_strategy(strategy: FailureStrategy) -> FailureHandlingStrategy
447451
let retry_total_times = strategy.retry_total_times.unwrap_or(3);
448452
let wait_time_ms = strategy.wait_time_ms.unwrap_or(1000);
449453
FailureHandlingStrategy::DeadMessageQueue(DeadMessageQueueStrategy {
454+
tenant: tenant.to_string(),
450455
topic_name,
451456
retry_total_times,
452457
wait_time_ms,

src/admin-server/src/mqtt/pub_sub.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ use validator::Validate;
1919

2020
#[derive(Debug, Serialize, Deserialize, Validate)]
2121
pub struct PublishReq {
22+
#[validate(length(min = 1, max = 256, message = "Tenant length must be between 1-256"))]
23+
pub tenant: String,
24+
2225
#[validate(length(min = 1, max = 256, message = "Topic length must be between 1-256"))]
2326
pub topic: String,
2427

@@ -33,6 +36,7 @@ pub struct PublishReq {
3336

3437
#[derive(Debug, Serialize, Deserialize)]
3538
pub struct ReadReq {
39+
pub tenant: String,
3640
pub topic: String,
3741
pub offset: u64,
3842
}
@@ -82,6 +86,7 @@ async fn send_inner(state: Arc<HttpState>, params: PublishReq) -> Result<Vec<u64
8286
let client_id = format!("{}_{}", config.cluster_name, config.broker_id);
8387

8488
if let Err(e) = try_init_topic(
89+
&params.tenant,
8590
&params.topic,
8691
&state.mqtt_context.cache_manager,
8792
&state.storage_driver_manager,
@@ -107,7 +112,13 @@ async fn send_inner(state: Arc<HttpState>, params: PublishReq) -> Result<Vec<u64
107112
if let Err(e) = state
108113
.mqtt_context
109114
.retain_message_manager
110-
.save_retain_message(&params.topic, &client_id, &publish, &publish_properties)
115+
.save_retain_message(
116+
&params.tenant,
117+
&params.topic,
118+
&client_id,
119+
&publish,
120+
&publish_properties,
121+
)
111122
.await
112123
{
113124
return Err(CommonError::CommonError(e.to_string()));
@@ -120,7 +131,7 @@ async fn send_inner(state: Arc<HttpState>, params: PublishReq) -> Result<Vec<u64
120131
MqttMessage::build_record(&client_id, &publish, &publish_properties, message_expire)
121132
{
122133
offset = message_storage
123-
.append_topic_message(&params.topic.clone(), vec![record])
134+
.append_topic_message(&params.tenant, &params.topic.clone(), vec![record])
124135
.await?;
125136
}
126137

@@ -144,7 +155,7 @@ pub async fn read_inner(
144155
data.insert(params.topic.clone(), params.offset);
145156

146157
let data = message_storage
147-
.read_topic_message(&params.topic, &data, 100)
158+
.read_topic_message(&params.tenant, &params.topic, &data, 100)
148159
.await?;
149160

150161
for row in data {

src/admin-server/src/mqtt/subscribe.rs

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ pub struct AutoSubscribeListReq {
9999

100100
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Validate)]
101101
pub struct CreateAutoSubscribeReq {
102+
#[validate(length(min = 1, max = 256, message = "Tenant length must be between 1-256"))]
103+
pub tenant: String,
104+
102105
#[validate(length(min = 1, max = 256, message = "Topic length must be between 1-256"))]
103106
pub topic: String,
104107

@@ -114,12 +117,8 @@ pub struct CreateAutoSubscribeReq {
114117

115118
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Validate)]
116119
pub struct DeleteAutoSubscribeReq {
117-
#[validate(length(
118-
min = 1,
119-
max = 256,
120-
message = "Topic name length must be between 1-256"
121-
))]
122-
pub topic_name: String,
120+
#[validate(length(min = 1, max = 256, message = "uniq_id length must be between 1-256"))]
121+
pub uniq_id: String,
123122
}
124123

125124
#[derive(Clone, Serialize, Deserialize)]
@@ -157,6 +156,7 @@ pub struct SlowSubscribeListRow {
157156
pub subscribe_name: String,
158157
}
159158

159+
use common_base::uuid::unique_id;
160160
use common_base::{
161161
http_response::{error_response, success_response},
162162
utils::time_util::timestamp_to_local_datetime,
@@ -347,6 +347,8 @@ pub async fn auto_subscribe_create(
347347
};
348348

349349
let auto_subscribe_rule = MqttAutoSubscribeRule {
350+
uniq_id: unique_id(),
351+
tenant: params.tenant.clone(),
350352
topic: params.topic.clone(),
351353
qos: qos_new,
352354
no_local: params.no_local,
@@ -356,17 +358,12 @@ pub async fn auto_subscribe_create(
356358

357359
let auto_subscribe_storage = AutoSubscribeStorage::new(state.client_pool.clone());
358360
if let Err(e) = auto_subscribe_storage
359-
.set_auto_subscribe_rule(auto_subscribe_rule.clone())
361+
.create_auto_subscribe_rule(auto_subscribe_rule.clone())
360362
.await
361363
{
362364
return error_response(e.to_string());
363365
}
364366

365-
state
366-
.mqtt_context
367-
.cache_manager
368-
.add_auto_subscribe_rule(auto_subscribe_rule);
369-
370367
success_response("success")
371368
}
372369

@@ -376,7 +373,7 @@ pub async fn auto_subscribe_delete(
376373
) -> String {
377374
let auto_subscribe_storage = AutoSubscribeStorage::new(state.client_pool.clone());
378375
if let Err(e) = auto_subscribe_storage
379-
.delete_auto_subscribe_rule(params.topic_name.clone())
376+
.delete_auto_subscribe_rule(params.uniq_id.clone())
380377
.await
381378
{
382379
return error_response(e.to_string());
@@ -385,7 +382,7 @@ pub async fn auto_subscribe_delete(
385382
state
386383
.mqtt_context
387384
.cache_manager
388-
.delete_auto_subscribe_rule(&params.topic_name);
385+
.delete_auto_subscribe_rule(&params.uniq_id);
389386

390387
success_response("success")
391388
}

src/admin-server/src/mqtt/topic.rs

Lines changed: 65 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use validator::Validate;
4040

4141
#[derive(Serialize, Deserialize, Debug)]
4242
pub struct TopicListReq {
43+
pub tenant: Option<String>,
4344
pub topic_name: Option<String>,
4445
pub topic_type: Option<String>, // "all", "normal", "system"
4546
pub limit: Option<u32>,
@@ -53,11 +54,13 @@ pub struct TopicListReq {
5354

5455
#[derive(Serialize, Deserialize, Debug)]
5556
pub struct TopicDetailReq {
57+
pub tenant: String,
5658
pub topic_name: String,
5759
}
5860

5961
#[derive(Serialize, Deserialize, Debug, Validate)]
6062
pub struct TopicDeleteRep {
63+
pub tenant: String,
6164
#[validate(length(
6265
min = 1,
6366
max = 256,
@@ -79,6 +82,9 @@ pub struct TopicRewriteReq {
7982

8083
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Validate)]
8184
pub struct CreateTopicRewriteReq {
85+
#[validate(length(min = 1, max = 128, message = "Tenant length must be between 1-128"))]
86+
pub tenant: String,
87+
8288
#[validate(length(min = 1, max = 50, message = "Action length must be between 1-50"))]
8389
#[validate(custom(function = "validate_rewrite_action"))]
8490
pub action: String,
@@ -116,6 +122,9 @@ fn validate_rewrite_action(action: &str) -> Result<(), validator::ValidationErro
116122

117123
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Validate)]
118124
pub struct DeleteTopicRewriteReq {
125+
#[validate(length(min = 1, max = 128, message = "Tenant length must be between 1-128"))]
126+
pub tenant: String,
127+
119128
#[validate(length(min = 1, max = 50, message = "Action length must be between 1-50"))]
120129
#[validate(custom(function = "validate_rewrite_action"))]
121130
pub action: String,
@@ -159,41 +168,46 @@ pub async fn topic_list(
159168
params.exact_match,
160169
);
161170

162-
let mut topics: Vec<Topic> = Vec::new();
163-
if let Some(tp) = params.topic_name.clone() {
164-
if let Some(topic) = state
165-
.mqtt_context
166-
.cache_manager
167-
.broker_cache
168-
.get_topic_by_name(&tp)
169-
{
170-
topics.push(topic.clone());
171-
}
171+
let broker_cache = &state.mqtt_context.cache_manager.broker_cache;
172+
let topics: Vec<Topic> = if let Some(tp) = params.topic_name.clone() {
173+
let topic = if let Some(ref t) = params.tenant {
174+
broker_cache.get_topic_by_name(t, &tp)
175+
} else {
176+
broker_cache
177+
.topic_list
178+
.iter()
179+
.find_map(|e| e.value().get(&tp).map(|t| t.clone()))
180+
};
181+
topic.into_iter().collect()
172182
} else {
173183
let topic_type = params.topic_type.as_deref().unwrap_or("all");
174-
for entry in state
175-
.mqtt_context
176-
.cache_manager
177-
.broker_cache
178-
.topic_list
179-
.iter()
180-
{
181-
let topic = entry.value();
182-
let allow = if topic_type == "system" {
183-
entry.topic_name.contains("$")
184-
} else if topic_type == "normal" {
185-
!entry.topic_name.contains("$")
186-
} else {
187-
true
188-
};
189-
190-
if !allow {
191-
continue;
192-
}
193-
194-
topics.push(topic.clone());
195-
}
196-
}
184+
let raw: Vec<Topic> = if let Some(ref t) = params.tenant {
185+
broker_cache.list_topics_by_tenant(t)
186+
} else {
187+
broker_cache
188+
.topic_list
189+
.iter()
190+
.flat_map(|tenant_entry| {
191+
tenant_entry
192+
.value()
193+
.iter()
194+
.map(|topic_entry| topic_entry.value().clone())
195+
.collect::<Vec<_>>()
196+
})
197+
.collect()
198+
};
199+
raw.into_iter()
200+
.filter(|topic| {
201+
if topic_type == "system" {
202+
topic.topic_name.contains('$')
203+
} else if topic_type == "normal" {
204+
!topic.topic_name.contains('$')
205+
} else {
206+
true
207+
}
208+
})
209+
.collect()
210+
};
197211

198212
let filtered = apply_filters(topics, &options);
199213
let sorted = apply_sorting(filtered, &options);
@@ -209,6 +223,7 @@ impl Queryable for Topic {
209223
fn get_field_str(&self, field: &str) -> Option<String> {
210224
match field {
211225
"topic_name" => Some(self.topic_name.clone()),
226+
"tenant" => Some(self.tenant.clone()),
212227
_ => None,
213228
}
214229
}
@@ -236,7 +251,7 @@ async fn read_topic_detail(
236251
.mqtt_context
237252
.cache_manager
238253
.broker_cache
239-
.get_topic_by_name(&params.topic_name)
254+
.get_topic_by_name(&params.tenant, &params.topic_name)
240255
{
241256
topic
242257
} else {
@@ -248,7 +263,7 @@ async fn read_topic_detail(
248263
let storage_list = state
249264
.mqtt_context
250265
.storage_driver_manager
251-
.list_storage_resource(&topic.topic_name)
266+
.list_storage_resource(&topic.tenant, &topic.topic_name)
252267
.await?;
253268

254269
let sub_list = if let Some(list) = state
@@ -278,7 +293,10 @@ pub async fn topic_delete(
278293
ValidatedJson(params): ValidatedJson<TopicDeleteRep>,
279294
) -> String {
280295
let topic_storage = TopicStorage::new(state.client_pool.clone());
281-
if let Err(e) = topic_storage.delete_topic(&params.topic_name).await {
296+
if let Err(e) = topic_storage
297+
.delete_topic(&params.tenant, &params.topic_name)
298+
.await
299+
{
282300
return error_response(e.to_string());
283301
}
284302
success_response("success")
@@ -334,6 +352,7 @@ pub async fn topic_rewrite_create(
334352
ValidatedJson(params): ValidatedJson<CreateTopicRewriteReq>,
335353
) -> String {
336354
let rule = MqttTopicRewriteRule {
355+
tenant: params.tenant.clone(),
337356
action: params.action.clone(),
338357
source_topic: params.source_topic.clone(),
339358
dest_topic: params.dest_topic.clone(),
@@ -365,15 +384,20 @@ pub async fn topic_rewrite_delete(
365384
) -> String {
366385
let topic_storage = TopicStorage::new(state.client_pool.clone());
367386
if let Err(e) = topic_storage
368-
.delete_topic_rewrite_rule(params.action.clone(), params.source_topic.clone())
387+
.delete_topic_rewrite_rule(
388+
params.tenant.clone(),
389+
params.action.clone(),
390+
params.source_topic.clone(),
391+
)
369392
.await
370393
{
371394
return error_response(e.to_string());
372395
}
373-
state
374-
.mqtt_context
375-
.cache_manager
376-
.delete_topic_rewrite_rule(&params.action, &params.source_topic);
396+
state.mqtt_context.cache_manager.delete_topic_rewrite_rule(
397+
&params.tenant,
398+
&params.action,
399+
&params.source_topic,
400+
);
377401
state
378402
.mqtt_context
379403
.cache_manager

0 commit comments

Comments
 (0)