From 5540589657feba42869c55302f76234684ace635 Mon Sep 17 00:00:00 2001 From: shun jiang Date: Thu, 4 Dec 2025 15:34:27 -0800 Subject: [PATCH] msp last peek feature --- endpoints/openrtb2/auction.go | 7 ++ exchange/exchange.go | 179 +++++++++++++++++++++------------- exchange/msp_exchange.go | 166 +++++++++++++++++++++++++++++++ 3 files changed, 285 insertions(+), 67 deletions(-) create mode 100644 exchange/msp_exchange.go diff --git a/endpoints/openrtb2/auction.go b/endpoints/openrtb2/auction.go index ee29584a61e..e1704122b04 100644 --- a/endpoints/openrtb2/auction.go +++ b/endpoints/openrtb2/auction.go @@ -210,6 +210,13 @@ func (deps *endpointDeps) Auction(w http.ResponseWriter, r *http.Request, _ http ctx := context.Background() timeout := deps.cfg.AuctionTimeouts.LimitAuctionTimeout(time.Duration(req.TMax) * time.Millisecond) + + // adjust tmax for requests with last peek feature enabled + msbConfig := exchange.ExtractMSBInfoReq(req.BidRequest) + if msbConfig.LastPeek.PeekStartTimeMilliSeconds > 0 { + timeout += time.Duration(msbConfig.LastPeek.PeekStartTimeMilliSeconds/2) * time.Millisecond + } + if timeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithDeadline(ctx, start.Add(timeout)) diff --git a/exchange/exchange.go b/exchange/exchange.go index ef24e3fc9c7..1be5d3a9c47 100644 --- a/exchange/exchange.go +++ b/exchange/exchange.go @@ -710,6 +710,94 @@ func (e *exchange) makeAuctionContext(ctx context.Context, needsCache bool) (auc return } +// requestBidderBids handles a single bidder request and sends the result to the channel +func (e *exchange) requestBidderBids( + ctx context.Context, + bidderRequests []BidderRequest, + bidder BidderRequest, + conversions currency.Conversions, + globalPrivacyControlHeader string, + liveAdaptersPreferredMediaType openrtb_ext.PreferredMediaType, + accountDebugAllowed bool, + headerDebugAllowed bool, + experiment *openrtb_ext.Experiment, + bidAdjustments map[string]float64, + tmaxAdjustments *TmaxAdjustmentsPreprocessed, + responseDebugAllowed bool, + alternateBidderCodes openrtb_ext.ExtAlternateBidderCodes, + hookExecutor hookexecution.StageExecutor, + bidAdjustmentRules map[string][]openrtb_ext.Adjustment, + chBids chan *bidResponseWrapper) { + // Here we actually call the adapters and collect the bids. + bidderRunner := e.recoverSafely(bidderRequests, func(bidderRequest BidderRequest, conversions currency.Conversions) { + // Passing in aName so a doesn't change out from under the go routine + if bidderRequest.BidderLabels.Adapter == "" { + logger.Errorf("Exchange: bidlables for %s (%s) missing adapter string", bidderRequest.BidderName, bidderRequest.BidderCoreName) + bidderRequest.BidderLabels.Adapter = bidderRequest.BidderCoreName + } + brw := new(bidResponseWrapper) + brw.bidder = bidderRequest.BidderName + brw.adapter = bidderRequest.BidderCoreName + // Defer basic metrics to insure we capture them after all the values have been set + defer func() { + e.me.RecordAdapterRequest(bidderRequest.BidderLabels) + }() + start := time.Now() + + reqInfo := adapters.NewExtraRequestInfo(conversions) + reqInfo.PbsEntryPoint = bidderRequest.BidderLabels.RType + reqInfo.GlobalPrivacyControlHeader = globalPrivacyControlHeader + + if len(liveAdaptersPreferredMediaType) > 0 { + if mtype, found := liveAdaptersPreferredMediaType[bidder.BidderName]; found { + reqInfo.PreferredMediaType = mtype + } + } + + bidReqOptions := bidRequestOptions{ + accountDebugAllowed: accountDebugAllowed, + headerDebugAllowed: headerDebugAllowed, + addCallSignHeader: isAdsCertEnabled(experiment, e.bidderInfo[string(bidderRequest.BidderName)]), + bidAdjustments: bidAdjustments, + tmaxAdjustments: tmaxAdjustments, + bidderRequestStartTime: start, + responseDebugAllowed: responseDebugAllowed, + } + seatBids, extraBidderRespInfo, err := e.adapterMap[bidderRequest.BidderCoreName].requestBid(ctx, bidderRequest, conversions, &reqInfo, e.adsCertSigner, bidReqOptions, alternateBidderCodes, hookExecutor, bidAdjustmentRules) + brw.bidderResponseStartTime = extraBidderRespInfo.respProcessingStartTime + + // Add in time reporting + elapsed := time.Since(start) + brw.adapterSeatBids = seatBids + brw.seatNonBidBuilder = extraBidderRespInfo.seatNonBidBuilder + // Structure to record extra tracking data generated during bidding + ae := new(seatResponseExtra) + ae.ResponseTimeMillis = int(elapsed / time.Millisecond) + if len(seatBids) != 0 { + ae.HttpCalls = seatBids[0].HttpCalls + } + // Timing statistics + e.me.RecordAdapterTime(bidderRequest.BidderLabels, elapsed) + bidderRequest.BidderLabels.AdapterBids = bidsToMetric(brw.adapterSeatBids) + bidderRequest.BidderLabels.AdapterErrors = errorsToMetric(err) + // Append any bid validation errors to the error list + ae.Errors = errsToBidderErrors(err) + ae.Warnings = errsToBidderWarnings(err) + brw.adapterExtra = ae + for _, seatBid := range seatBids { + if seatBid != nil { + for _, bid := range seatBid.Bids { + var cpm = float64(bid.Bid.Price * 1000) + e.me.RecordAdapterPrice(bidderRequest.BidderLabels, cpm) + e.me.RecordAdapterBidReceived(bidderRequest.BidderLabels, bid.BidType, bid.Bid.AdM != "") + } + } + } + chBids <- brw + }, chBids) + go bidderRunner(bidder, conversions) +} + // This piece sends all the requests to the bidder adapters and gathers the results. func (e *exchange) getAllBids( ctx context.Context, @@ -738,75 +826,32 @@ func (e *exchange) getAllBids( e.me.RecordOverheadTime(metrics.MakeBidderRequests, time.Since(pbsRequestStartTime)) + lastPeekBidderRequests := []BidderRequest{} + msbConfig := extractMSBInfoBidders(bidderRequests) + // create a map for quick lookup of last peek bidders + peekBiddersMap := make(map[string]bool) + for _, bidderName := range msbConfig.LastPeek.PeekBidders { + peekBiddersMap[bidderName] = true + } for _, bidder := range bidderRequests { - // Here we actually call the adapters and collect the bids. - bidderRunner := e.recoverSafely(bidderRequests, func(bidderRequest BidderRequest, conversions currency.Conversions) { - // Passing in aName so a doesn't change out from under the go routine - if bidderRequest.BidderLabels.Adapter == "" { - logger.Errorf("Exchange: bidlables for %s (%s) missing adapter string", bidderRequest.BidderName, bidderRequest.BidderCoreName) - bidderRequest.BidderLabels.Adapter = bidderRequest.BidderCoreName - } - brw := new(bidResponseWrapper) - brw.bidder = bidderRequest.BidderName - brw.adapter = bidderRequest.BidderCoreName - // Defer basic metrics to insure we capture them after all the values have been set - defer func() { - e.me.RecordAdapterRequest(bidderRequest.BidderLabels) - }() - start := time.Now() - - reqInfo := adapters.NewExtraRequestInfo(conversions) - reqInfo.PbsEntryPoint = bidderRequest.BidderLabels.RType - reqInfo.GlobalPrivacyControlHeader = globalPrivacyControlHeader - - if len(liveAdaptersPreferredMediaType) > 0 { - if mtype, found := liveAdaptersPreferredMediaType[bidder.BidderName]; found { - reqInfo.PreferredMediaType = mtype - } - } + // save last peek bidder requests and process later + bidderName := bidder.BidderName.String() + if peekBiddersMap[bidderName] { + lastPeekBidderRequests = append(lastPeekBidderRequests, bidder) + continue + } + e.requestBidderBids(ctx, bidderRequests, bidder, conversions, globalPrivacyControlHeader, liveAdaptersPreferredMediaType, + accountDebugAllowed, headerDebugAllowed, experiment, bidAdjustments, tmaxAdjustments, + responseDebugAllowed, alternateBidderCodes, hookExecutor, bidAdjustmentRules, chBids) + } - bidReqOptions := bidRequestOptions{ - accountDebugAllowed: accountDebugAllowed, - headerDebugAllowed: headerDebugAllowed, - addCallSignHeader: isAdsCertEnabled(experiment, e.bidderInfo[string(bidderRequest.BidderName)]), - bidAdjustments: bidAdjustments, - tmaxAdjustments: tmaxAdjustments, - bidderRequestStartTime: start, - responseDebugAllowed: responseDebugAllowed, - } - seatBids, extraBidderRespInfo, err := e.adapterMap[bidderRequest.BidderCoreName].requestBid(ctx, bidderRequest, conversions, &reqInfo, e.adsCertSigner, bidReqOptions, alternateBidderCodes, hookExecutor, bidAdjustmentRules) - brw.bidderResponseStartTime = extraBidderRespInfo.respProcessingStartTime - - // Add in time reporting - elapsed := time.Since(start) - brw.adapterSeatBids = seatBids - brw.seatNonBidBuilder = extraBidderRespInfo.seatNonBidBuilder - // Structure to record extra tracking data generated during bidding - ae := new(seatResponseExtra) - ae.ResponseTimeMillis = int(elapsed / time.Millisecond) - if len(seatBids) != 0 { - ae.HttpCalls = seatBids[0].HttpCalls - } - // Timing statistics - e.me.RecordAdapterTime(bidderRequest.BidderLabels, elapsed) - bidderRequest.BidderLabels.AdapterBids = bidsToMetric(brw.adapterSeatBids) - bidderRequest.BidderLabels.AdapterErrors = errorsToMetric(err) - // Append any bid validation errors to the error list - ae.Errors = errsToBidderErrors(err) - ae.Warnings = errsToBidderWarnings(err) - brw.adapterExtra = ae - for _, seatBid := range seatBids { - if seatBid != nil { - for _, bid := range seatBid.Bids { - var cpm = float64(bid.Bid.Price * 1000) - e.me.RecordAdapterPrice(bidderRequest.BidderLabels, cpm) - e.me.RecordAdapterBidReceived(bidderRequest.BidderLabels, bid.BidType, bid.Bid.AdM != "") - } - } - } - chBids <- brw - }, chBids) - go bidderRunner(bidder, conversions) + // process last peek bidder requests: + if len(lastPeekBidderRequests) > 0 { + for _, bidder := range mspUpdateLastPeekBiddersRequest(chBids, lastPeekBidderRequests, msbConfig.LastPeek, len(bidderRequests)-len(lastPeekBidderRequests)) { + e.requestBidderBids(ctx, bidderRequests, bidder, conversions, globalPrivacyControlHeader, liveAdaptersPreferredMediaType, + accountDebugAllowed, headerDebugAllowed, experiment, bidAdjustments, tmaxAdjustments, + responseDebugAllowed, alternateBidderCodes, hookExecutor, bidAdjustmentRules, chBids) + } } // Wait for the bidders to do their thing diff --git a/exchange/msp_exchange.go b/exchange/msp_exchange.go new file mode 100644 index 00000000000..a7b60d307ac --- /dev/null +++ b/exchange/msp_exchange.go @@ -0,0 +1,166 @@ +package exchange + +import ( + "encoding/json" + "time" + + "github.com/buger/jsonparser" + "github.com/golang/glog" + "github.com/prebid/openrtb/v20/openrtb2" + "github.com/prebid/prebid-server/v3/openrtb_ext" + jsonpatch "gopkg.in/evanphx/json-patch.v5" +) + +type MSBExt struct { + MSB MSBConfig `json:"msb"` +} + +type MSBConfig struct { + LastPeek MSBLastPeekConfig `json:"last_peek"` +} + +type MSBLastPeekConfig struct { + // start peek available bids after this + PeekStartTimeMilliSeconds int64 `json:"peek_start_time_miliseconds"` + // list of bidder names that are in last peek tier + PeekBidders []string `json:"peek_bidders"` +} + +/* +MSB feature controller(req.ext) example: +{ + "ext": { + "msb": { + "last_peek": { + "peek_start_time_miliseconds": 900, + "peek_bidders": ["msp_google", "msp_nova"] + } + } + } +} +MSP server is response for adding above MSB info to requests for certain traffic/placement/exps. +In the above example + there are two peek tiers for bidders: + 1. last peek tier (bidders specified in peek_bidders) + 2. the rest(normal tier) + after all reponses are ready for normal tier or timeout=peek_start_time_miliseconds(900ms), for bidders in last peek tier peek available responses from normal tier, + get max_available_bid_prices_for_normal_tier and the winner bidder, then write peek_winner_price and peek_winner_bidder to req.imp.ext.bidder +*/ + +type MSPFloor struct { + Floor float64 `json:"floor"` + PeekWinnerBidder string `json:"peek_winner_bidder"` + PeekWinnerPrice float64 `json:"peek_winner_price"` +} + +var mspBidders = map[openrtb_ext.BidderName]int{ + openrtb_ext.BidderMspGoogle: 1, + openrtb_ext.BidderMspFbAlpha: 1, + openrtb_ext.BidderMspFbBeta: 1, + openrtb_ext.BidderMspFbGamma: 1, + openrtb_ext.BidderMspNova: 1, +} + +// peek max available(ready) bid price and winner bidder from channel without comsuming +func peekChannelAvailableMaxBidPriceWithinTimeout(peekTier string, chBids chan *bidResponseWrapper, lastPeekConfig MSBLastPeekConfig, totalNormalBidders int) (float64, string) { + timeout := time.After(time.Duration(lastPeekConfig.PeekStartTimeMilliSeconds) * time.Millisecond) + maxPrice := 0.0 + winnerBidder := "" + hasData := true + peekedRespList := []*bidResponseWrapper{} + availableBidders := []string{} + // keep consuming message from channel until all normal bidder requests are collected or timeout reaches + for hasData { + select { + case resp, ok := <-chBids: + if !ok { + hasData = false + } else { + for _, bids := range resp.adapterSeatBids { + for _, bid := range bids.Bids { + if bid.Bid != nil { + if bid.Bid.Price > maxPrice { + maxPrice = bid.Bid.Price + winnerBidder = resp.bidder.String() + } + } + } + } + peekedRespList = append(peekedRespList, resp) + availableBidders = append(availableBidders, resp.bidder.String()) + if len(peekedRespList) == totalNormalBidders { + hasData = false + } + } + + case <-timeout: + hasData = false + } + + } + // push message back + for _, resp := range peekedRespList { + chBids <- resp + } + glog.Infof("MSB tier %s, peeked from available bidders %v, current max bid price: %f, winner bidder: %s", peekTier, availableBidders, maxPrice, winnerBidder) + return maxPrice, winnerBidder +} + +func mspUpdateLastPeekBiddersRequest( + chBids chan *bidResponseWrapper, + lastPeekBidderRequests []BidderRequest, + lastPeekConfig MSBLastPeekConfig, + totalNormalBidders int, +) []BidderRequest { + maxPrice, winnerBidder := peekChannelAvailableMaxBidPriceWithinTimeout("lastPeek", chBids, lastPeekConfig, totalNormalBidders) + for reqIdx := range lastPeekBidderRequests { + for idx := range lastPeekBidderRequests[reqIdx].BidRequest.Imp { + bidder := &lastPeekBidderRequests[reqIdx] + // for msp bidders, update req.imp.ext.bidder with peek_winner_price and peek_winner_bidder + // which will be used later by msp module stage: https://github.com/ParticleMedia/msp/blob/master/pkg/modules/dam_buckets/module/hook_bidder_request.go#L69 + if _, found := mspBidders[bidder.BidderName]; found { + extBytes, err := jsonObject(bidder.BidRequest.Imp[idx].Ext, "bidder") + if err == nil { + var impExt MSPFloor + err = json.Unmarshal(extBytes, &impExt) + if err == nil { + impExt.PeekWinnerPrice = maxPrice + impExt.PeekWinnerBidder = winnerBidder + updatedBytes, _ := json.Marshal(impExt) + updatedBidderBytes, _ := jsonpatch.MergePatch(extBytes, updatedBytes) + updatedExtBytes, _ := jsonparser.Set(bidder.BidRequest.Imp[idx].Ext, updatedBidderBytes, "bidder") + bidder.BidRequest.Imp[idx].Ext = updatedExtBytes + } + } + } + } + } + return lastPeekBidderRequests +} + +func jsonObject(data []byte, keys ...string) ([]byte, error) { + if result, dataType, _, err := jsonparser.Get(data, keys...); err == nil && dataType == jsonparser.Object { + return result, nil + } else { + return nil, err + } +} + +func extractMSBInfoBidders(reqList []BidderRequest) MSBConfig { + if len(reqList) > 0 { + return ExtractMSBInfoReq(reqList[0].BidRequest) + } + + return MSBConfig{} +} + +func ExtractMSBInfoReq(req *openrtb2.BidRequest) MSBConfig { + var config MSBExt + if req != nil { + err := json.Unmarshal(req.Ext, &config) + if err != nil { + glog.Error("MSB extract bidder config:", err) + } + } + return config.MSB +}