Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions endpoints/openrtb2/auction.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,13 @@ func (deps *endpointDeps) Auction(w http.ResponseWriter, r *http.Request, _ http
ctx := context.Background()

timeout := deps.cfg.AuctionTimeouts.LimitAuctionTimeout(time.Duration(req.TMax) * time.Millisecond)

// adjust tmax for requests with last peek feature enabled
msbConfig := exchange.ExtractMSBInfoReq(req.BidRequest)
if msbConfig.LastPeek.PeekStartTimeMilliSeconds > 0 {
timeout += time.Duration(msbConfig.LastPeek.PeekStartTimeMilliSeconds/2) * time.Millisecond
}

if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(ctx, start.Add(timeout))
Expand Down
179 changes: 112 additions & 67 deletions exchange/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,94 @@ func (e *exchange) makeAuctionContext(ctx context.Context, needsCache bool) (auc
return
}

// requestBidderBids handles a single bidder request and sends the result to the channel
func (e *exchange) requestBidderBids(
ctx context.Context,
bidderRequests []BidderRequest,
bidder BidderRequest,
conversions currency.Conversions,
globalPrivacyControlHeader string,
liveAdaptersPreferredMediaType openrtb_ext.PreferredMediaType,
accountDebugAllowed bool,
headerDebugAllowed bool,
experiment *openrtb_ext.Experiment,
bidAdjustments map[string]float64,
tmaxAdjustments *TmaxAdjustmentsPreprocessed,
responseDebugAllowed bool,
alternateBidderCodes openrtb_ext.ExtAlternateBidderCodes,
hookExecutor hookexecution.StageExecutor,
bidAdjustmentRules map[string][]openrtb_ext.Adjustment,
chBids chan *bidResponseWrapper) {
// Here we actually call the adapters and collect the bids.
bidderRunner := e.recoverSafely(bidderRequests, func(bidderRequest BidderRequest, conversions currency.Conversions) {
// Passing in aName so a doesn't change out from under the go routine
if bidderRequest.BidderLabels.Adapter == "" {
logger.Errorf("Exchange: bidlables for %s (%s) missing adapter string", bidderRequest.BidderName, bidderRequest.BidderCoreName)
bidderRequest.BidderLabels.Adapter = bidderRequest.BidderCoreName
}
brw := new(bidResponseWrapper)
brw.bidder = bidderRequest.BidderName
brw.adapter = bidderRequest.BidderCoreName
// Defer basic metrics to insure we capture them after all the values have been set
defer func() {
e.me.RecordAdapterRequest(bidderRequest.BidderLabels)
}()
start := time.Now()

reqInfo := adapters.NewExtraRequestInfo(conversions)
reqInfo.PbsEntryPoint = bidderRequest.BidderLabels.RType
reqInfo.GlobalPrivacyControlHeader = globalPrivacyControlHeader

if len(liveAdaptersPreferredMediaType) > 0 {
if mtype, found := liveAdaptersPreferredMediaType[bidder.BidderName]; found {
reqInfo.PreferredMediaType = mtype
}
}

bidReqOptions := bidRequestOptions{
accountDebugAllowed: accountDebugAllowed,
headerDebugAllowed: headerDebugAllowed,
addCallSignHeader: isAdsCertEnabled(experiment, e.bidderInfo[string(bidderRequest.BidderName)]),
bidAdjustments: bidAdjustments,
tmaxAdjustments: tmaxAdjustments,
bidderRequestStartTime: start,
responseDebugAllowed: responseDebugAllowed,
}
seatBids, extraBidderRespInfo, err := e.adapterMap[bidderRequest.BidderCoreName].requestBid(ctx, bidderRequest, conversions, &reqInfo, e.adsCertSigner, bidReqOptions, alternateBidderCodes, hookExecutor, bidAdjustmentRules)
brw.bidderResponseStartTime = extraBidderRespInfo.respProcessingStartTime

// Add in time reporting
elapsed := time.Since(start)
brw.adapterSeatBids = seatBids
brw.seatNonBidBuilder = extraBidderRespInfo.seatNonBidBuilder
// Structure to record extra tracking data generated during bidding
ae := new(seatResponseExtra)
ae.ResponseTimeMillis = int(elapsed / time.Millisecond)
if len(seatBids) != 0 {
ae.HttpCalls = seatBids[0].HttpCalls
}
// Timing statistics
e.me.RecordAdapterTime(bidderRequest.BidderLabels, elapsed)
bidderRequest.BidderLabels.AdapterBids = bidsToMetric(brw.adapterSeatBids)
bidderRequest.BidderLabels.AdapterErrors = errorsToMetric(err)
// Append any bid validation errors to the error list
ae.Errors = errsToBidderErrors(err)
ae.Warnings = errsToBidderWarnings(err)
brw.adapterExtra = ae
for _, seatBid := range seatBids {
if seatBid != nil {
for _, bid := range seatBid.Bids {
var cpm = float64(bid.Bid.Price * 1000)
e.me.RecordAdapterPrice(bidderRequest.BidderLabels, cpm)
e.me.RecordAdapterBidReceived(bidderRequest.BidderLabels, bid.BidType, bid.Bid.AdM != "")
}
}
}
chBids <- brw
}, chBids)
go bidderRunner(bidder, conversions)
}

// This piece sends all the requests to the bidder adapters and gathers the results.
func (e *exchange) getAllBids(
ctx context.Context,
Expand Down Expand Up @@ -738,75 +826,32 @@ func (e *exchange) getAllBids(

e.me.RecordOverheadTime(metrics.MakeBidderRequests, time.Since(pbsRequestStartTime))

lastPeekBidderRequests := []BidderRequest{}
msbConfig := extractMSBInfoBidders(bidderRequests)
// create a map for quick lookup of last peek bidders
peekBiddersMap := make(map[string]bool)
for _, bidderName := range msbConfig.LastPeek.PeekBidders {
peekBiddersMap[bidderName] = true
}
for _, bidder := range bidderRequests {
// Here we actually call the adapters and collect the bids.
bidderRunner := e.recoverSafely(bidderRequests, func(bidderRequest BidderRequest, conversions currency.Conversions) {
// Passing in aName so a doesn't change out from under the go routine
if bidderRequest.BidderLabels.Adapter == "" {
logger.Errorf("Exchange: bidlables for %s (%s) missing adapter string", bidderRequest.BidderName, bidderRequest.BidderCoreName)
bidderRequest.BidderLabels.Adapter = bidderRequest.BidderCoreName
}
brw := new(bidResponseWrapper)
brw.bidder = bidderRequest.BidderName
brw.adapter = bidderRequest.BidderCoreName
// Defer basic metrics to insure we capture them after all the values have been set
defer func() {
e.me.RecordAdapterRequest(bidderRequest.BidderLabels)
}()
start := time.Now()

reqInfo := adapters.NewExtraRequestInfo(conversions)
reqInfo.PbsEntryPoint = bidderRequest.BidderLabels.RType
reqInfo.GlobalPrivacyControlHeader = globalPrivacyControlHeader

if len(liveAdaptersPreferredMediaType) > 0 {
if mtype, found := liveAdaptersPreferredMediaType[bidder.BidderName]; found {
reqInfo.PreferredMediaType = mtype
}
}
// save last peek bidder requests and process later
bidderName := bidder.BidderName.String()
if peekBiddersMap[bidderName] {
lastPeekBidderRequests = append(lastPeekBidderRequests, bidder)
continue
}
e.requestBidderBids(ctx, bidderRequests, bidder, conversions, globalPrivacyControlHeader, liveAdaptersPreferredMediaType,
accountDebugAllowed, headerDebugAllowed, experiment, bidAdjustments, tmaxAdjustments,
responseDebugAllowed, alternateBidderCodes, hookExecutor, bidAdjustmentRules, chBids)
}

bidReqOptions := bidRequestOptions{
accountDebugAllowed: accountDebugAllowed,
headerDebugAllowed: headerDebugAllowed,
addCallSignHeader: isAdsCertEnabled(experiment, e.bidderInfo[string(bidderRequest.BidderName)]),
bidAdjustments: bidAdjustments,
tmaxAdjustments: tmaxAdjustments,
bidderRequestStartTime: start,
responseDebugAllowed: responseDebugAllowed,
}
seatBids, extraBidderRespInfo, err := e.adapterMap[bidderRequest.BidderCoreName].requestBid(ctx, bidderRequest, conversions, &reqInfo, e.adsCertSigner, bidReqOptions, alternateBidderCodes, hookExecutor, bidAdjustmentRules)
brw.bidderResponseStartTime = extraBidderRespInfo.respProcessingStartTime

// Add in time reporting
elapsed := time.Since(start)
brw.adapterSeatBids = seatBids
brw.seatNonBidBuilder = extraBidderRespInfo.seatNonBidBuilder
// Structure to record extra tracking data generated during bidding
ae := new(seatResponseExtra)
ae.ResponseTimeMillis = int(elapsed / time.Millisecond)
if len(seatBids) != 0 {
ae.HttpCalls = seatBids[0].HttpCalls
}
// Timing statistics
e.me.RecordAdapterTime(bidderRequest.BidderLabels, elapsed)
bidderRequest.BidderLabels.AdapterBids = bidsToMetric(brw.adapterSeatBids)
bidderRequest.BidderLabels.AdapterErrors = errorsToMetric(err)
// Append any bid validation errors to the error list
ae.Errors = errsToBidderErrors(err)
ae.Warnings = errsToBidderWarnings(err)
brw.adapterExtra = ae
for _, seatBid := range seatBids {
if seatBid != nil {
for _, bid := range seatBid.Bids {
var cpm = float64(bid.Bid.Price * 1000)
e.me.RecordAdapterPrice(bidderRequest.BidderLabels, cpm)
e.me.RecordAdapterBidReceived(bidderRequest.BidderLabels, bid.BidType, bid.Bid.AdM != "")
}
}
}
chBids <- brw
}, chBids)
go bidderRunner(bidder, conversions)
// process last peek bidder requests:
if len(lastPeekBidderRequests) > 0 {
for _, bidder := range mspUpdateLastPeekBiddersRequest(chBids, lastPeekBidderRequests, msbConfig.LastPeek, len(bidderRequests)-len(lastPeekBidderRequests)) {
e.requestBidderBids(ctx, bidderRequests, bidder, conversions, globalPrivacyControlHeader, liveAdaptersPreferredMediaType,
accountDebugAllowed, headerDebugAllowed, experiment, bidAdjustments, tmaxAdjustments,
responseDebugAllowed, alternateBidderCodes, hookExecutor, bidAdjustmentRules, chBids)
}
}

// Wait for the bidders to do their thing
Expand Down
166 changes: 166 additions & 0 deletions exchange/msp_exchange.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package exchange

import (
"encoding/json"
"time"

"github.com/buger/jsonparser"
"github.com/golang/glog"
"github.com/prebid/openrtb/v20/openrtb2"
"github.com/prebid/prebid-server/v3/openrtb_ext"
jsonpatch "gopkg.in/evanphx/json-patch.v5"
)

type MSBExt struct {
MSB MSBConfig `json:"msb"`
}

type MSBConfig struct {
LastPeek MSBLastPeekConfig `json:"last_peek"`
}

type MSBLastPeekConfig struct {
// start peek available bids after this
PeekStartTimeMilliSeconds int64 `json:"peek_start_time_miliseconds"`
// list of bidder names that are in last peek tier
PeekBidders []string `json:"peek_bidders"`
}

/*
MSB feature controller(req.ext) example:
{
"ext": {
"msb": {
"last_peek": {
"peek_start_time_miliseconds": 900,
"peek_bidders": ["msp_google", "msp_nova"]
}
}
}
}
MSP server is response for adding above MSB info to requests for certain traffic/placement/exps.
In the above example
there are two peek tiers for bidders:
1. last peek tier (bidders specified in peek_bidders)
2. the rest(normal tier)
after all reponses are ready for normal tier or timeout=peek_start_time_miliseconds(900ms), for bidders in last peek tier peek available responses from normal tier,
get max_available_bid_prices_for_normal_tier and the winner bidder, then write peek_winner_price and peek_winner_bidder to req.imp.ext.bidder
*/

type MSPFloor struct {
Floor float64 `json:"floor"`
PeekWinnerBidder string `json:"peek_winner_bidder"`
PeekWinnerPrice float64 `json:"peek_winner_price"`
}

var mspBidders = map[openrtb_ext.BidderName]int{
openrtb_ext.BidderMspGoogle: 1,
openrtb_ext.BidderMspFbAlpha: 1,
openrtb_ext.BidderMspFbBeta: 1,
openrtb_ext.BidderMspFbGamma: 1,
openrtb_ext.BidderMspNova: 1,
}

// peek max available(ready) bid price and winner bidder from channel without comsuming
func peekChannelAvailableMaxBidPriceWithinTimeout(peekTier string, chBids chan *bidResponseWrapper, lastPeekConfig MSBLastPeekConfig, totalNormalBidders int) (float64, string) {
timeout := time.After(time.Duration(lastPeekConfig.PeekStartTimeMilliSeconds) * time.Millisecond)
maxPrice := 0.0
winnerBidder := ""
hasData := true
peekedRespList := []*bidResponseWrapper{}
availableBidders := []string{}
// keep consuming message from channel until all normal bidder requests are collected or timeout reaches
for hasData {
select {
case resp, ok := <-chBids:
if !ok {
hasData = false
} else {
for _, bids := range resp.adapterSeatBids {
for _, bid := range bids.Bids {
if bid.Bid != nil {
if bid.Bid.Price > maxPrice {
maxPrice = bid.Bid.Price
winnerBidder = resp.bidder.String()
}
}
}
}
peekedRespList = append(peekedRespList, resp)
availableBidders = append(availableBidders, resp.bidder.String())
if len(peekedRespList) == totalNormalBidders {
hasData = false
}
}

case <-timeout:
hasData = false
}

}
// push message back
for _, resp := range peekedRespList {
chBids <- resp
}
glog.Infof("MSB tier %s, peeked from available bidders %v, current max bid price: %f, winner bidder: %s", peekTier, availableBidders, maxPrice, winnerBidder)
return maxPrice, winnerBidder
}

func mspUpdateLastPeekBiddersRequest(
chBids chan *bidResponseWrapper,
lastPeekBidderRequests []BidderRequest,
lastPeekConfig MSBLastPeekConfig,
totalNormalBidders int,
) []BidderRequest {
maxPrice, winnerBidder := peekChannelAvailableMaxBidPriceWithinTimeout("lastPeek", chBids, lastPeekConfig, totalNormalBidders)
for reqIdx := range lastPeekBidderRequests {
for idx := range lastPeekBidderRequests[reqIdx].BidRequest.Imp {
bidder := &lastPeekBidderRequests[reqIdx]
// for msp bidders, update req.imp.ext.bidder with peek_winner_price and peek_winner_bidder
// which will be used later by msp module stage: https://github.com/ParticleMedia/msp/blob/master/pkg/modules/dam_buckets/module/hook_bidder_request.go#L69
if _, found := mspBidders[bidder.BidderName]; found {
extBytes, err := jsonObject(bidder.BidRequest.Imp[idx].Ext, "bidder")
if err == nil {
var impExt MSPFloor
err = json.Unmarshal(extBytes, &impExt)
if err == nil {
impExt.PeekWinnerPrice = maxPrice
impExt.PeekWinnerBidder = winnerBidder
updatedBytes, _ := json.Marshal(impExt)
updatedBidderBytes, _ := jsonpatch.MergePatch(extBytes, updatedBytes)
updatedExtBytes, _ := jsonparser.Set(bidder.BidRequest.Imp[idx].Ext, updatedBidderBytes, "bidder")
bidder.BidRequest.Imp[idx].Ext = updatedExtBytes
}
}
}
}
}
return lastPeekBidderRequests
}

func jsonObject(data []byte, keys ...string) ([]byte, error) {
if result, dataType, _, err := jsonparser.Get(data, keys...); err == nil && dataType == jsonparser.Object {
return result, nil
} else {
return nil, err
}
}

func extractMSBInfoBidders(reqList []BidderRequest) MSBConfig {
if len(reqList) > 0 {
return ExtractMSBInfoReq(reqList[0].BidRequest)
}

return MSBConfig{}
}

func ExtractMSBInfoReq(req *openrtb2.BidRequest) MSBConfig {
var config MSBExt
if req != nil {
err := json.Unmarshal(req.Ext, &config)
if err != nil {
glog.Error("MSB extract bidder config:", err)
}
}
return config.MSB
}