一、Spark的應(yīng)用現(xiàn)狀
1.1Spark需求背景
隨著數(shù)據(jù)規(guī)模的持續(xù)增長(zhǎng),數(shù)據(jù)需求越來(lái)越多,原有的以MapReduce為代表的Hadoop平臺(tái)越來(lái)越顯示出其局限性。主要體現(xiàn)在2點(diǎn):
1) 任務(wù)執(zhí)行時(shí)間比較長(zhǎng)。特別是某些復(fù)雜的SQL任務(wù),或者一些復(fù)雜的機(jī)器學(xué)習(xí)迭代。
2) 不能很好的支持像機(jī)器學(xué)習(xí)、實(shí)時(shí)處理這種新的大數(shù)據(jù)處理需求。
Spark作為新一代大數(shù)據(jù)處理的計(jì)算平臺(tái),使得我們可以用Spark這一種平臺(tái)統(tǒng)一處理數(shù)據(jù)處理的各種復(fù)雜需求,非常好的支持了我們目前現(xiàn)有的業(yè)務(wù)。與原有MapReduce模型相比,其具有下面3個(gè)特點(diǎn):
1) 充分使用內(nèi)存作為框架計(jì)算過(guò)程存儲(chǔ)的介質(zhì),與磁盤(pán)相比大大提高了數(shù)據(jù)讀取速度。利用內(nèi)存緩存,顯著降低算法迭代時(shí)頻繁讀取數(shù)據(jù)的開(kāi)銷(xiāo)。
2) 更好的DAG框架。原有在MapReduce M-R-M-R的模型,在Spark框架下,更類(lèi)似與M-R-R,優(yōu)化掉無(wú)用流程節(jié)點(diǎn)。
3) 豐富的組件支持。如支持對(duì)結(jié)構(gòu)化數(shù)據(jù)執(zhí)行SQL操作的組件Spark-SQL,支持實(shí)時(shí)處理的組件Spark-Streaming,支持機(jī)器學(xué)習(xí)的組件Mllib,支持圖形學(xué)習(xí)的Graphx。
1.2以Spark為核心的數(shù)據(jù)平臺(tái)結(jié)構(gòu)
#FormatImgID_0#
2.1基于SparkStreaming的實(shí)時(shí)處理需求
商業(yè)數(shù)據(jù)部?jī)?nèi)部有大量的實(shí)時(shí)數(shù)據(jù)處理需求,如實(shí)時(shí)廣告收入計(jì)算,實(shí)時(shí)線(xiàn)上ctr預(yù)估,實(shí)時(shí)廣告重定向等,目前主要通過(guò)SparkStreaming完成。
實(shí)時(shí)數(shù)據(jù)處理的第一步,需要有實(shí)時(shí)的數(shù)據(jù)。360的用戶(hù)產(chǎn)品,幾乎全國(guó)各地都部署有機(jī)房,主要有4大主力機(jī)房。實(shí)時(shí)數(shù)據(jù)的收集過(guò)程如下:
1) 使用Apache flume實(shí)時(shí)將服務(wù)器的日志上傳至本地機(jī)房的Kafka,數(shù)據(jù)延遲在100ms以?xún)?nèi)
2) 使用Kafka MirorMaker將各大主力機(jī)房的數(shù)據(jù)匯總至洛陽(yáng)中心機(jī)房,數(shù)據(jù)延遲在200ms以?xún)?nèi)。由于公司的網(wǎng)絡(luò)環(huán)境不是很好,為了保證低延遲,在MirorMaker機(jī)房的機(jī)器上,申請(qǐng)了帶寬的QOS保證,以降低延遲。
數(shù)據(jù)處理的實(shí)時(shí)鏈路如所示:
1) 1種方式是通過(guò)Apache Flume實(shí)時(shí)寫(xiě)入Hdfs,用于第二天全量數(shù)據(jù)的離線(xiàn)計(jì)算
2) 1種方式是通過(guò)SparkSteaming實(shí)時(shí)處理,處理后數(shù)據(jù)會(huì)回流至Kafka或者Redis,便于后續(xù)流程使用。
#FormatImgID_2#
2.2基于SparkSQL和DataFrame的數(shù)據(jù)分析需求
SparkSQL是Spark的核心組件,作為新一代的SQL on Hadoop的解決方案,完美的支持了對(duì)現(xiàn)有Hive數(shù)據(jù)的存取。在與Hive進(jìn)行集成的同時(shí),Spark SQL也提供了JDBC/ODBC接口,便于第三方工具如Tableau、Qlik等通過(guò)該接口接入Spark SQL。
由于之前大部分?jǐn)?shù)據(jù)分析工作都是通過(guò)使用hive命令行完成的,為了將遷移至SparkSQL的代價(jià)最小,360系統(tǒng)部的同事開(kāi)發(fā)了SparkSQL的命令行版本spark-hive。原有的以hive 命令運(yùn)行的腳本,簡(jiǎn)單的改成spark-hive便可以運(yùn)行。360系統(tǒng)部的同事也做了大量兼容性的工作。spark-hive目前已經(jīng)比較穩(wěn)定,成為數(shù)據(jù)分析的首選。
DataFrame是Spark 1.3引入的新API,與RDD類(lèi)似,DataFrame也是一個(gè)分布式數(shù)據(jù)容器。
但與RDD不同的是,DataFrame除了數(shù)據(jù)以外,還掌握更多數(shù)據(jù)的結(jié)構(gòu)信息,即schema。同時(shí),與Hive類(lèi)似,DataFrame也支持嵌套數(shù)據(jù)類(lèi)型(struct、array和map)。從API易用性的角度上 看,DataFrame API提供的是一套高層的關(guān)系操作,比函數(shù)式的RDD API要更加友好,門(mén)檻更低。
大數(shù)據(jù)開(kāi)發(fā)過(guò)程中,可能會(huì)遇到各種類(lèi)型的數(shù)據(jù)源,而DataFrame與生俱來(lái)就支持各種數(shù)據(jù)類(lèi)型,如下圖,包括JSON文件、Parquet文件、Hive表格、本地文件系統(tǒng)、分布式文件系統(tǒng)(HDFS)以及云存儲(chǔ)(S3)。同時(shí),配合JDBC,它還可以讀取外部關(guān)系型數(shù)據(jù)庫(kù)系統(tǒng)如Mysql,Oracle中的數(shù)據(jù)。對(duì)于自帶Schema的數(shù)據(jù)類(lèi)型,如Parquet,DataFrame還能夠自動(dòng)解析列類(lèi)型。
通過(guò)組合使用DataFrame和SparkSQL,與MapReduce比較大大減少了代碼行數(shù),同時(shí)執(zhí)行效率也得到了提升。如下示例是處理廣告主位置信息的scala代碼。
2.3基于MLLib的機(jī)器學(xué)習(xí)需求
360DMP提供人群擴(kuò)展功能(Look-alike)。所謂人群擴(kuò)展,是基于廣告主創(chuàng)建的種子用戶(hù),根據(jù)這些種子用戶(hù)的特征,挖掘、篩選、識(shí)別、拓展更多具有相似特征的用戶(hù),以增加廣告的受眾。
業(yè)界的Look-alike有2種做法。第一種做法就是顯性的定位。廣告主先選中一部分種子用戶(hù),根據(jù)種子用戶(hù)的標(biāo)簽再定位擴(kuò)展一部分其他用戶(hù)。比如如果種子用戶(hù)選擇的都是“化妝品-護(hù)膚”這個(gè)標(biāo)簽,那么根據(jù)這個(gè)標(biāo)簽可以找到其他的用戶(hù),作為擴(kuò)展用戶(hù)。這種做法的缺點(diǎn)是不夠精確,擴(kuò)展出來(lái)的用戶(hù)過(guò)大。第二種方法是通過(guò)一個(gè)機(jī)器學(xué)習(xí)的模型,將問(wèn)題轉(zhuǎn)化為機(jī)器學(xué)習(xí)模型,來(lái)定位廣告主的潛在用戶(hù)。我們采用的是這種方法。
在做Look-alike的過(guò)程中,用到了Spark中的Mlilib庫(kù)。Mlilib算法庫(kù)的核心庫(kù)如上,選擇的是Classification中LR算法,主要原因有兩個(gè):
1)模型比較簡(jiǎn)單,易于理解和實(shí)現(xiàn)
2)模型訓(xùn)練起來(lái)速度比較快,時(shí)間可控。
LookAlike的第一步是建立模型。在這里,廣告主會(huì)首先提交一批種子用戶(hù),作為機(jī)器學(xué)習(xí)的正樣本。其他的非種子用戶(hù)作為負(fù)樣本。于是問(wèn)題就轉(zhuǎn)化為一個(gè)二分類(lèi)的模型,正負(fù)樣本組成學(xué)習(xí)的樣本。訓(xùn)練模型之后,通過(guò)模型預(yù)測(cè),最后得到廣告主需要的目標(biāo)人群。
三、部分經(jīng)驗(yàn)總結(jié)
3.1使用Direct模式處理kafka數(shù)據(jù)
SparkStreaming讀取Kafka數(shù)據(jù)時(shí),有兩種方法:Direct和Receiver。我們選擇的是Direct方法。與基于Receiver的方法相比,Direct具有以下優(yōu)點(diǎn):
1)簡(jiǎn)化并行性:無(wú)需創(chuàng)建多個(gè)輸入Kafka流和聯(lián)合它們。使用directStream,Spark Streaming將創(chuàng)建與要消費(fèi)的Kafka分區(qū)一樣多的RDD分區(qū),這將從Kafka并行讀取數(shù)據(jù)。因此,Kafka和RDD分區(qū)之間存在一對(duì)一映射,這更容易理解和調(diào)整。
2)效率:在第一種方法中實(shí)現(xiàn)零數(shù)據(jù)丟失需要將數(shù)據(jù)存儲(chǔ)在預(yù)寫(xiě)日志中,該日志進(jìn)一步復(fù)制數(shù)據(jù)。這實(shí)際上是低效的,因?yàn)閿?shù)據(jù)有效地被復(fù)制兩次。第二種方法消除了問(wèn)題,因?yàn)闆](méi)有接收器,因此不需要預(yù)寫(xiě)日志。
3)Exactly-once語(yǔ)義:第一種方法使用Kafka的高級(jí)API在Zookeeper中存儲(chǔ)消耗的偏移量。這是傳統(tǒng)上消費(fèi)Kafka數(shù)據(jù)的方式。雖然這種方法(與預(yù)寫(xiě)日志結(jié)合)可以確保零數(shù)據(jù)丟失(即至少一次語(yǔ)義),但是一些記錄在一些故障下可能被消費(fèi)兩次,這是因?yàn)镾park Streaming可靠接收的數(shù)據(jù)與Zookeeper跟蹤的偏移之間存在不一致。因此,在第二種方法中,我們使用不基于Zookeeper的簡(jiǎn)單的Kafka API,偏移由Spark Streaming在其檢查點(diǎn)內(nèi)跟蹤。這消除了Spark Streaming和Zookeeper / Kafka之間的不一致,所以每個(gè)記錄被Spark Streaming有效地接收一次。
Direct方法需要自己控制消費(fèi)的kafka offset,參考代碼如下。
3.2 SparkSQL中使用Parquet
相比傳統(tǒng)的行式存儲(chǔ)引擎,列式存儲(chǔ)引擎因其更高的壓縮比,更少的IO操作而越來(lái)越受到重視。這是因?yàn)樵诨ヂ?lián)網(wǎng)公司的大數(shù)據(jù)應(yīng)用中,大部分情況下,數(shù)據(jù)量很大并且數(shù)據(jù)字段數(shù)目比較多,但是大部分查詢(xún)只是查詢(xún)其中的部分行,部分列。這個(gè)時(shí)候,使用列式存儲(chǔ)就能極大的發(fā)揮其優(yōu)勢(shì)。
Parquet是Spark中優(yōu)先支持的列存方案。與使用文本相比,Parquet 讓 Spark SQL 的性能平均提高了 10 倍,這要感謝初級(jí)的讀取器過(guò)濾器、高效的執(zhí)行計(jì)劃,以及 Spark 1.6.0 中經(jīng)過(guò)改進(jìn)的掃描吞吐量。
SparSQL的Parquet的幾個(gè)操作:
1)創(chuàng)建Parquet格式的Hive表
CREATE TABLE parquet_table(age INT, name STRING) STORED AS PARQUET;
2)讀取Parquet格式的文件
valsqlContext = neworg.apache.spark.sql.SQLContext(sc)
sqlContext.read.parquet("/input/parquet")
3)保存為Parquet格式文件
df.write.parquet("/output/parquet")
3.3 Spark參數(shù)調(diào)優(yōu)
1)spark.sql.shuffle.partitions:在做Join或者Group的時(shí)候,可以通過(guò)適當(dāng)提高該值避免數(shù)據(jù)傾斜。
2)spark.testing.reserveMemory:Spark executor jvm啟動(dòng)的時(shí)候,會(huì)默認(rèn)保留一部分內(nèi)存,默認(rèn)為300m。適當(dāng)?shù)臏p少這個(gè)值,可以增加 spark執(zhí)行時(shí)Storage Memory的值。設(shè)置方式是啟動(dòng)spark shell的時(shí)候加上參數(shù):--conf spark.testing.reservedMemory= 104857600。
3)spark.serializer:Spark內(nèi)部會(huì)涉及到很多對(duì)數(shù)據(jù)進(jìn)行序列化的地方,默認(rèn)使用的是Java的序列化機(jī)制。Spark同時(shí)支持使用Kryo序列化庫(kù),Kryo序列化類(lèi)庫(kù)的性能比Java序列化類(lèi)庫(kù)的性能要高很多。官方介紹,Kryo序列化機(jī)制比Java序列化機(jī)制,性能高10倍左右。Spark之所以默認(rèn)沒(méi)有使用Kryo作為序列化類(lèi)庫(kù),是因?yàn)镵ryo要求最好要注冊(cè)所有需要進(jìn)行序列化的自定義類(lèi)型,因此對(duì)于開(kāi)發(fā)者來(lái)說(shuō),這種方式比較麻煩。設(shè)置方法是conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")。
關(guān)于作者:
王曉偉,360大數(shù)據(jù)開(kāi)發(fā)工程師,從事大數(shù)據(jù)相關(guān)平臺(tái)開(kāi)發(fā)和數(shù)據(jù)倉(cāng)庫(kù)開(kāi)發(fā),曾經(jīng)為多個(gè)開(kāi)源框架,如Yarn、Pig、Hive、Tez貢獻(xiàn)代碼。
關(guān)于360商業(yè)數(shù)據(jù)部:
360商業(yè)數(shù)據(jù)部專(zhuān)注于360自有海量數(shù)據(jù)的深度挖掘及分析,在保護(hù)個(gè)人隱私及數(shù)據(jù)安全前提下,多維分析用戶(hù)需求和偏好,運(yùn)用數(shù)據(jù)挖掘和人工智能技術(shù),以及場(chǎng)景化應(yīng)用全面提升商業(yè)價(jià)值,已形成包括360商易、360DMP和360分析在內(nèi)的數(shù)據(jù)營(yíng)銷(xiāo)產(chǎn)品體系。360商易基于海量數(shù)據(jù)洞察人群畫(huà)像及品牌現(xiàn)狀,為營(yíng)銷(xiāo)決策提供支持;360DMP對(duì)數(shù)據(jù)進(jìn)行整合管理,精準(zhǔn)圈定目標(biāo)人群,提升轉(zhuǎn)化效果;360分析支持推廣效果評(píng)估及流量分析,實(shí)時(shí)優(yōu)化投放。該大數(shù)據(jù)產(chǎn)品體系,結(jié)合360點(diǎn)睛實(shí)效平臺(tái),共同為廣告主提供大數(shù)據(jù)精準(zhǔn)營(yíng)銷(xiāo)閉環(huán)服務(wù)。
文章內(nèi)容僅供閱讀,不構(gòu)成投資建議,請(qǐng)謹(jǐn)慎對(duì)待。投資者據(jù)此操作,風(fēng)險(xiǎn)自擔(dān)。
11月11日,據(jù)網(wǎng)經(jīng)社數(shù)字零售臺(tái)(DR.100EC.CN)數(shù)據(jù)顯示,秋冬服飾仍是雙11的C位,女士針織衫、女士外套、女士羽絨服等位居服飾消費(fèi)前列,女士夾克銷(xiāo)量同比增長(zhǎng)72%,女士棉衣、女士羊毛衫銷(xiāo)量同比增長(zhǎng)50%以上。男士外套銷(xiāo)量同比增長(zhǎng)30%以上。
奧維云網(wǎng)(AVC)推總數(shù)據(jù)顯示,2024年1-9月明火炊具線(xiàn)上零售額94.2億元,同比增加3.1%,其中抖音渠道表現(xiàn)優(yōu)異,同比有14%的漲幅,傳統(tǒng)電商略有下滑,同比降低2.3%。
“以前都要去窗口辦,一套流程下來(lái)都要半個(gè)月了,現(xiàn)在方便多了!”打開(kāi)“重慶公積金”微信小程序,按照提示流程提交相關(guān)材料,僅幾秒鐘,重慶市民曾某的賬戶(hù)就打進(jìn)了21600元。
華碩ProArt創(chuàng)藝27 Pro PA279CRV顯示器,憑借其優(yōu)秀的性能配置和精準(zhǔn)的色彩呈現(xiàn)能力,為您的創(chuàng)作工作帶來(lái)實(shí)質(zhì)性的幫助,雙十一期間低至2799元,性?xún)r(jià)比很高,簡(jiǎn)直是創(chuàng)作者們的首選。
9月14日,2024全球工業(yè)互聯(lián)網(wǎng)大會(huì)——工業(yè)互聯(lián)網(wǎng)標(biāo)識(shí)解析專(zhuān)題論壇在沈陽(yáng)成功舉辦。