Skip to content

Commit ba8707d

Browse files
committed
#AI commit# 开发阶段: 优化spark引擎队列获取方式
1 parent 8fb5aed commit ba8707d

1 file changed

Lines changed: 125 additions & 115 deletions

File tree

  • linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine

linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala

Lines changed: 125 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,8 @@ class DefaultEngineCreateService
209209
engineCreateRequest.getProperties,
210210
engineCreateRequest.getUser,
211211
labelFilter.choseEngineLabel(labelList),
212-
timeout
212+
timeout,
213+
isCreateEngine = true
213214
)
214215
// 4. request resource
215216
val resourceTicketId = resourceManager.requestResource(
@@ -390,13 +391,15 @@ class DefaultEngineCreateService
390391
* @param engineCreateRequest
391392
* @param labelList
392393
* @param timeout
394+
* @param isCreateEngine
393395
* @return
394396
*/
395397
def generateResource(
396398
props: util.Map[String, String],
397399
user: String,
398400
labelList: util.List[Label[_]],
399-
timeout: Long
401+
timeout: Long,
402+
isCreateEngine: Boolean = false
400403
): NodeResource = {
401404
val configProp = engineConnConfigurationService.getConsoleConfiguration(labelList)
402405
if (null != configProp && configProp.asScala.nonEmpty) {
@@ -408,7 +411,7 @@ class DefaultEngineCreateService
408411
}
409412

410413
// Smart queue selection (executed before creating YarnResource)
411-
performSmartQueueSelection(props, labelList)
414+
performSmartQueueSelection(props, labelList, isCreateEngine)
412415

413416
val crossQueue = props.get(AMConfiguration.CROSS_QUEUE)
414417
if (StringUtils.isNotBlank(crossQueue)) {
@@ -435,139 +438,146 @@ class DefaultEngineCreateService
435438
*/
436439
private def performSmartQueueSelection(
437440
properties: util.Map[String, String],
438-
labelList: util.List[Label[_]]
441+
labelList: util.List[Label[_]],
442+
isCreateEngine: Boolean = false
439443
): Unit = {
440444
try {
441-
// 1. Get queue configuration
442-
val primaryQueue = properties.getOrDefault(AMConfiguration.YARN_QUEUE_NAME_CONFIG_KEY, "").trim
443-
val secondaryQueue =
444-
properties.getOrDefault(AMConfiguration.SECONDARY_YARN_QUEUE_NAME_CONFIG_KEY, "").trim
445-
446-
// 2. Get system configuration
447-
val enabled = RMConfiguration.SECONDARY_QUEUE_ENABLED.getValue
448-
val threshold = RMConfiguration.SECONDARY_QUEUE_THRESHOLD.getValue
449-
val supportedEngines = RMConfiguration.SECONDARY_QUEUE_ENGINES.getValue
450-
.split(",")
451-
.map(_.trim)
452-
.map(_.toLowerCase())
453-
.toSet
454-
val supportedCreators = RMConfiguration.SECONDARY_QUEUE_CREATORS.getValue
455-
.split(",")
456-
.map(_.trim)
457-
.map(_.toUpperCase())
458-
.toSet
459-
460-
logger.info(
461-
s"Smart queue config - primary queue: $primaryQueue, secondary queue: $secondaryQueue"
462-
)
445+
if (isCreateEngine) {
446+
// 1. Get queue configuration
447+
val primaryQueue =
448+
properties.getOrDefault(AMConfiguration.YARN_QUEUE_NAME_CONFIG_KEY, "").trim
449+
val secondaryQueue =
450+
properties.getOrDefault(AMConfiguration.SECONDARY_YARN_QUEUE_NAME_CONFIG_KEY, "").trim
451+
452+
// 2. Get system configuration
453+
val enabled = RMConfiguration.SECONDARY_QUEUE_ENABLED.getValue
454+
val threshold = RMConfiguration.SECONDARY_QUEUE_THRESHOLD.getValue
455+
val supportedEngines = RMConfiguration.SECONDARY_QUEUE_ENGINES.getValue
456+
.split(",")
457+
.map(_.trim)
458+
.map(_.toLowerCase())
459+
.toSet
460+
val supportedCreators = RMConfiguration.SECONDARY_QUEUE_CREATORS.getValue
461+
.split(",")
462+
.map(_.trim)
463+
.map(_.toUpperCase())
464+
.toSet
463465

464-
// 3. Check if secondary queue feature is enabled
465-
if (!enabled || StringUtils.isBlank(secondaryQueue) || StringUtils.isBlank(primaryQueue)) {
466466
logger.info(
467-
"Smart queue selection is not enabled or secondary queue is empty, using primary queue"
467+
s"Smart queue config - primary queue: $primaryQueue, secondary queue: $secondaryQueue"
468468
)
469-
return
470-
}
471469

472-
// 4. Get engine type and Creator
473-
var engineType: String = null
474-
var creator: String = null
475-
476-
try {
477-
if (labelList != null && !labelList.isEmpty) {
478-
engineType = LabelUtil.getEngineType(labelList)
479-
val userCreatorLabel = labelList.asScala
480-
.find(_.isInstanceOf[UserCreatorLabel])
481-
.map(_.asInstanceOf[UserCreatorLabel])
482-
.orNull
483-
if (userCreatorLabel != null) {
484-
creator = userCreatorLabel.getCreator
485-
}
470+
// 3. Check if secondary queue feature is enabled
471+
if (!enabled || StringUtils.isBlank(secondaryQueue) || StringUtils.isBlank(primaryQueue)) {
472+
logger.info(
473+
"Smart queue selection is not enabled or secondary queue is empty, using primary queue"
474+
)
475+
return
486476
}
487-
} catch {
488-
case e: Exception =>
489-
logger.error("Failed to parse labels for queue selection", e)
490-
}
491477

492-
// 5. Check if engine type and Creator are in supported list
493-
val engineMatched = engineType == null || supportedEngines.contains(engineType.toLowerCase())
494-
val creatorMatched = creator == null || supportedCreators.contains(creator.toUpperCase())
478+
// 4. Get engine type and Creator
479+
var engineType: String = null
480+
var creator: String = null
481+
482+
try {
483+
if (labelList != null && !labelList.isEmpty) {
484+
engineType = LabelUtil.getEngineType(labelList)
485+
val userCreatorLabel = labelList.asScala
486+
.find(_.isInstanceOf[UserCreatorLabel])
487+
.map(_.asInstanceOf[UserCreatorLabel])
488+
.orNull
489+
if (userCreatorLabel != null) {
490+
creator = userCreatorLabel.getCreator
491+
}
492+
}
493+
} catch {
494+
case e: Exception =>
495+
logger.error("Failed to parse labels for queue selection", e)
496+
}
495497

496-
if (!engineMatched || !creatorMatched) {
497-
logger.info(
498-
s"Engine type or Creator not in supported list - engineType: $engineType (matched: $engineMatched), creator: $creator (matched: $creatorMatched)"
499-
)
500-
return
501-
}
498+
// 5. Check if engine type and Creator are in supported list
499+
val engineMatched =
500+
engineType == null || supportedEngines.contains(engineType.toLowerCase())
501+
val creatorMatched = creator == null || supportedCreators.contains(creator.toUpperCase())
502502

503-
// 6. Query secondary queue resource usage
504-
try {
505-
val labelContainer = labelResourceService.enrichLabels(labelList)
503+
if (!engineMatched || !creatorMatched) {
504+
logger.info(
505+
s"Engine type or Creator not in supported list - engineType: $engineType (matched: $engineMatched), creator: $creator (matched: $creatorMatched)"
506+
)
507+
return
508+
}
506509

507-
val yarnResourceIdentifier = new YarnResourceIdentifier(secondaryQueue)
508-
val queueInfo = externalResourceService.getResource(
509-
ResourceType.Yarn,
510-
labelContainer,
511-
yarnResourceIdentifier
512-
)
510+
// 6. Query secondary queue resource usage
511+
try {
512+
val labelContainer = labelResourceService.enrichLabels(labelList)
513513

514-
if (queueInfo != null) {
515-
val usedResource = queueInfo.getUsedResource.asInstanceOf[YarnResource]
516-
val maxResource = queueInfo.getMaxResource.asInstanceOf[YarnResource]
514+
val yarnResourceIdentifier = new YarnResourceIdentifier(secondaryQueue)
515+
val queueInfo = externalResourceService.getResource(
516+
ResourceType.Yarn,
517+
labelContainer,
518+
yarnResourceIdentifier
519+
)
517520

518-
// 7. Three-dimensional independent judgment
519-
val useSecondaryQueue = if (maxResource != null && maxResource.getQueueMemory > 0) {
520-
val memoryUsage =
521-
usedResource.getQueueMemory.toDouble / maxResource.getQueueMemory.toDouble
522-
val cpuUsage = if (maxResource.getQueueCores > 0) {
523-
usedResource.getQueueCores.toDouble / maxResource.getQueueCores.toDouble
524-
} else {
525-
0.0
526-
}
527-
val instanceUsage = if (maxResource.getQueueInstances > 0) {
528-
usedResource.getQueueInstances.toDouble / maxResource.getQueueInstances.toDouble
521+
if (queueInfo != null) {
522+
val usedResource = queueInfo.getUsedResource.asInstanceOf[YarnResource]
523+
val maxResource = queueInfo.getMaxResource.asInstanceOf[YarnResource]
524+
525+
// 7. Three-dimensional independent judgment
526+
val useSecondaryQueue = if (maxResource != null && maxResource.getQueueMemory > 0) {
527+
val memoryUsage =
528+
usedResource.getQueueMemory.toDouble / maxResource.getQueueMemory.toDouble
529+
val cpuUsage = if (maxResource.getQueueCores > 0) {
530+
usedResource.getQueueCores.toDouble / maxResource.getQueueCores.toDouble
531+
} else {
532+
0.0
533+
}
534+
val instanceUsage = if (maxResource.getQueueInstances > 0) {
535+
usedResource.getQueueInstances.toDouble / maxResource.getQueueInstances.toDouble
536+
} else {
537+
0.0
538+
}
539+
// Do not use secondary queue if any dimension exceeds threshold
540+
val memoryOverThreshold = memoryUsage > threshold
541+
val cpuOverThreshold = cpuUsage > threshold
542+
val instanceOverThreshold = instanceUsage > threshold
543+
544+
if (memoryOverThreshold || cpuOverThreshold || instanceOverThreshold) {
545+
logger.info(
546+
s"Secondary queue resource usage- memory: $memoryOverThreshold, cpu: $cpuOverThreshold, instance: $instanceOverThreshold, using primary queue"
547+
)
548+
false
549+
} else {
550+
logger.info("Secondary queue has sufficient resources, using secondary queue")
551+
true
552+
}
529553
} else {
530-
0.0
531-
}
532-
// Do not use secondary queue if any dimension exceeds threshold
533-
val memoryOverThreshold = memoryUsage > threshold
534-
val cpuOverThreshold = cpuUsage > threshold
535-
val instanceOverThreshold = instanceUsage > threshold
536-
537-
if (memoryOverThreshold || cpuOverThreshold || instanceOverThreshold) {
538-
logger.info(
539-
s"Secondary queue resource usage- memory: $memoryOverThreshold, cpu: $cpuOverThreshold, instance: $instanceOverThreshold, using primary queue"
540-
)
554+
logger.warn("Secondary queue max resource is empty, using primary queue")
541555
false
542-
} else {
543-
logger.info("Secondary queue has sufficient resources, using secondary queue")
544-
true
545556
}
546-
} else {
547-
logger.warn("Secondary queue max resource is empty, using primary queue")
548-
false
549-
}
550557

551-
// 8. Determine which queue to use and update wds.linkis.rm.yarnqueue
552-
val selectedQueue = if (useSecondaryQueue) secondaryQueue else primaryQueue
553-
val oldQueue = properties.get(AMConfiguration.YARN_QUEUE_NAME_CONFIG_KEY)
554-
properties.put(AMConfiguration.YARN_QUEUE_NAME_CONFIG_KEY, selectedQueue)
558+
// 8. Determine which queue to use and update wds.linkis.rm.yarnqueue
559+
val selectedQueue = if (useSecondaryQueue) secondaryQueue else primaryQueue
560+
val oldQueue = properties.get(AMConfiguration.YARN_QUEUE_NAME_CONFIG_KEY)
561+
properties.put(AMConfiguration.YARN_QUEUE_NAME_CONFIG_KEY, selectedQueue)
555562

556-
logger.info(
557-
s"Smart queue selection completed - original queue: $oldQueue, selected queue: $selectedQueue"
558-
)
563+
logger.info(
564+
s"Smart queue selection completed - original queue: $oldQueue, selected queue: $selectedQueue"
565+
)
559566

560-
} else {
561-
logger.warn(
562-
s"Unable to get secondary queue $secondaryQueue information, using primary queue: $primaryQueue"
563-
)
564-
}
567+
} else {
568+
logger.warn(
569+
s"Unable to get secondary queue $secondaryQueue information, using primary queue: $primaryQueue"
570+
)
571+
}
565572

566-
} catch {
567-
case e: Exception =>
568-
logger.error(s"Exception in smart queue selection, using primary queue: $primaryQueue", e)
573+
} catch {
574+
case e: Exception =>
575+
logger.error(
576+
s"Exception in smart queue selection, using primary queue: $primaryQueue",
577+
e
578+
)
579+
}
569580
}
570-
571581
} catch {
572582
case e: Exception =>
573583
logger.error(

0 commit comments

Comments
 (0)