-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathConsumer-Notebook.scala
More file actions
188 lines (160 loc) · 7.85 KB
/
Consumer-Notebook.scala
File metadata and controls
188 lines (160 loc) · 7.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
// Databricks notebook source
import org.apache.spark.eventhubs._
import com.microsoft.azure.eventhubs._
// Build connection string with the above information
val namespaceName = " << fill here >> "
val eventHubName = " << fill here >> "
val sasKeyName = " << fill here >> "
val sasKey = " << fill here >> "
val connStr = new com.microsoft.azure.eventhubs.ConnectionStringBuilder()
.setNamespaceName(namespaceName)
.setEventHubName(eventHubName)
.setSasKeyName(sasKeyName)
.setSasKey(sasKey)
val customEventhubParameters =
EventHubsConf(connStr.toString())
.setMaxEventsPerTrigger(5)
val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()
// incomingStream.printSchema
// Sending the incoming stream into the console.
// Data comes in batches!
// incomingStream.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
// Event Hub message format is JSON and contains "body" field
// Body is binary, so we cast it to string to see the actual content of the message
val messages =
incomingStream
.withColumn("Offset", $"offset".cast(LongType))
.withColumn("Time (readable)", $"enqueuedTime".cast(TimestampType))
.withColumn("Timestamp", $"enqueuedTime".cast(LongType))
.withColumn("Body", $"body".cast(StringType))
.select("Offset", "Time (readable)", "Timestamp", "Body")
messages.printSchema
messages.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()
// COMMAND ----------
import java.io._
import java.net._
import java.util._
case class Language(documents: Array[LanguageDocuments], errors: Array[Any]) extends Serializable
case class LanguageDocuments(id: String, detectedLanguages: Array[DetectedLanguages]) extends Serializable
case class DetectedLanguages(name: String, iso6391Name: String, score: Double) extends Serializable
case class Sentiment(documents: Array[SentimentDocuments], errors: Array[Any]) extends Serializable
case class SentimentDocuments(id: String, score: Double) extends Serializable
case class RequestToTextApi(documents: Array[RequestToTextApiDocument]) extends Serializable
case class RequestToTextApiDocument(id: String, text: String, var language: String = "") extends Serializable
// COMMAND ----------
import javax.net.ssl.HttpsURLConnection
import com.google.gson.Gson
import com.google.gson.GsonBuilder
import com.google.gson.JsonObject
import com.google.gson.JsonParser
import scala.util.parsing.json._
object SentimentDetector extends Serializable {
// Cognitive Services API connection settings
val accessKey = "d7524dbb96b741fd8844d00cc3de0b33"
// val host = "https://cognitive-docs.cognitiveservices.azure.com/"
val host = "https://mycogservices.cognitiveservices.azure.com/"
val languagesPath = "/text/analytics/v2.1/languages"
// calling TextAnalytics API of Azure Cognitive Services
val sentimentPath = "/text/analytics/v2.1/sentiment"
val languagesUrl = new URL(host+languagesPath)
val sentimenUrl = new URL(host+sentimentPath)
val g = new Gson
def getConnection(path: URL): HttpsURLConnection = {
val connection = path.openConnection().asInstanceOf[HttpsURLConnection]
connection.setRequestMethod("POST")
connection.setRequestProperty("Content-Type", "text/json")
connection.setRequestProperty("Ocp-Apim-Subscription-Key", accessKey)
connection.setDoOutput(true)
return connection
}
def prettify (json_text: String): String = {
val parser = new JsonParser()
val json = parser.parse(json_text).getAsJsonObject()
val gson = new GsonBuilder().setPrettyPrinting().create()
return gson.toJson(json)
}
// Handles the call to Cognitive Services API.
def processUsingApi(request: RequestToTextApi, path: URL): String = {
val requestToJson = g.toJson(request)
val encoded_text = requestToJson.getBytes("UTF-8")
val connection = getConnection(path)
val wr = new DataOutputStream(connection.getOutputStream())
wr.write(encoded_text, 0, encoded_text.length)
wr.flush()
wr.close()
val response = new StringBuilder()
val in = new BufferedReader(new InputStreamReader(connection.getInputStream()))
var line = in.readLine()
while (line != null) {
response.append(line)
line = in.readLine()
}
in.close()
return response.toString()
}
// Calls the language API for specified documents.
def getLanguage (inputDocs: RequestToTextApi): Option[Language] = {
try {
val response = processUsingApi(inputDocs, languagesUrl)
// In case we need to log the json response somewhere
val niceResponse = prettify(response)
// Deserializing the JSON response from the API into Scala types
val language = g.fromJson(niceResponse, classOf[Language])
if (language.documents(0).detectedLanguages(0).iso6391Name == "(Unknown)")
return None
return Some(language)
} catch {
case e: Exception => return None
}
}
// Calls the sentiment API for specified documents. Needs a language field to be set for each of them.
def getSentiment (inputDocs: RequestToTextApi): Option[Sentiment] = {
try {
val response = processUsingApi(inputDocs, sentimenUrl)
val niceResponse = prettify(response)
// Deserializing the JSON response from the API into Scala types
val sentiment = g.fromJson(niceResponse, classOf[Sentiment])
return Some(sentiment)
} catch {
case e: Exception => return None
}
}
}
// COMMAND ----------
// User Defined Function for processing content of messages to return their sentiment.
val toSentiment =
udf((textContent: String) =>
{
val inputObject = new RequestToTextApi(Array(new RequestToTextApiDocument(textContent, textContent)))
val detectedLanguage = SentimentDetector.getLanguage(inputObject)
detectedLanguage match {
case Some(language) =>
if(language.documents.size > 0) {
inputObject.documents(0).language = language.documents(0).detectedLanguages(0).iso6391Name
val sentimentDetected = SentimentDetector.getSentiment(inputObject)
sentimentDetected match {
case Some(sentiment) => {
if(sentiment.documents.size > 0) {
sentiment.documents(0).score.toString()
}
else {
"Error happened when getting sentiment: " + sentiment.errors(0).toString
}
}
case None => "Couldn't detect sentiment"
}
}
else {
"Error happened when getting language" + language.errors(0).toString
}
case None => "Couldn't detect language"
}
}
)
// COMMAND ----------
// Prepare a dataframe with Content and Sentiment columns
val streamingDataFrame = incomingStream.selectExpr("cast (body as string) AS Content").withColumn("Sentiment", toSentiment($"Content"))
// Display the streaming data with the sentiment
streamingDataFrame.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()