日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Scalaz(58)- scalaz-stream: fs2-并行运算示范,fs2 parallel processing

發布時間:2025/3/15 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Scalaz(58)- scalaz-stream: fs2-并行运算示范,fs2 parallel processing 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

? ? 從表面上來看,Stream代表一連串無窮數據元素。一連串的意思是元素有固定的排列順序,所以對元素的運算也必須按照順序來:完成了前面的運算再跟著進行下一個元素的運算。這樣來看,Stream應該不是很好的并行運算工具。但是,fs2所支持的并行運算方式不是以數據元素而是以?Stream為運算單位的:fs2支持多個Stream同時進行運算,如merge函數。所以fs2使Stream的并行運算成為了可能。

一般來說,我們可能在Stream的幾個狀態節點要求并行運算:

1、同時運算多個數據源頭來產生不排序的數據元素

2、同時對獲取的一連串數據元素進行處理,如:map(update),filter等等

3、同時將一連串數據元素無序存入終點(Sink)

我們可以創建一個例子來示范fs2的并行運算:?模擬從3個文件中讀取字串,然后統計在這3個文件中母音出現的次數。假設文件讀取和母音統計是有任意時間延遲的(latency),我們看看如何進行并行運算及并行運算能有多少效率上的提升。我們先設定一些跟蹤和模擬延遲的幫助函數:

1 def log[A](prompt: String): Pipe[Task,A,A] = _.evalMap { a => Task.delay{ println(s"$prompt>"); a }} 2 //> log: [A](prompt: String)fs2.Pipe[fs2.Task,A,A] 3 def randomDelay[A](max: FiniteDuration): Pipe[Task,A,A] = _.evalMap { a => 4 val delay: Task[Int] = Task.delay { scala.util.Random.nextInt(max.toMillis.toInt) } 5 delay.flatMap {d => Task.now(a).schedule(d.millis) } 6 } //> randomDelay: [A](max: scala.concurrent.duration.FiniteDuration)fs2.Pipe[fs2.

log是個跟蹤函數,randomDelay是個延遲模擬函數,模擬在max內的任意時間延遲。

與scalaz-stream-0.8不同,fs2重新實現了文件操作功能:不再依賴java的字串(string)處理功能。也不再依賴scodec的二進制數據轉換功能。下面是fs2的文件讀取方法示范:

1 val s1 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/BasicBackend.scala"),1024) 2 //> s1 : fs2.Stream[fs2.Task,Byte] = evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>) 3 val s2 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/DatabaseConfig.scala"),1024) 4 //> s2 : fs2.Stream[fs2.Task,Byte] = evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>) 5 val s3 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/BasicProfile.scala"),1024) 6 //> s3 : fs2.Stream[fs2.Task,Byte] = evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>)

fs2.io.file.readAll函數的款式如下:

def readAll[F[_]](path: Path, chunkSize: Int)(implicit F: Effect[F]): Stream[F, Byte] ={...}

readAll分批(by chunks)從文件中讀取Byte類型數據(當返回數據量小于chunkSize代表完成讀取),返回結果類型是Stream[F,Byte]。我們需要進行Byte>>>String轉換及分行等處理。fs2在text對象里提供了相關函數:

object text {private val utf8Charset = Charset.forName("UTF-8")/** Converts UTF-8 encoded byte stream to a stream of `String`. */def utf8Decode[F[_]]: Pipe[F, Byte, String] =_.chunks.through(utf8DecodeC)/** Converts UTF-8 encoded `Chunk[Byte]` inputs to `String`. */def utf8DecodeC[F[_]]: Pipe[F, Chunk[Byte], String] = {/*** Returns the number of continuation bytes if `b` is an ASCII byte or a* leading byte of a multi-byte sequence, and -1 otherwise.*/def continuationBytes(b: Byte): Int = {if ((b & 0x80) == 0x00) 0 // ASCII byteelse if ((b & 0xE0) == 0xC0) 1 // leading byte of a 2 byte seqelse if ((b & 0xF0) == 0xE0) 2 // leading byte of a 3 byte seqelse if ((b & 0xF8) == 0xF0) 3 // leading byte of a 4 byte seqelse -1 // continuation byte or garbage } ... /** Encodes a stream of `String` in to a stream of bytes using the UTF-8 charset. */def utf8Encode[F[_]]: Pipe[F, String, Byte] =_.flatMap(s => Stream.chunk(Chunk.bytes(s.getBytes(utf8Charset))))/** Encodes a stream of `String` in to a stream of `Chunk[Byte]` using the UTF-8 charset. */def utf8EncodeC[F[_]]: Pipe[F, String, Chunk[Byte]] =_.map(s => Chunk.bytes(s.getBytes(utf8Charset)))/** Transforms a stream of `String` such that each emitted `String` is a line from the input. */def lines[F[_]]: Pipe[F, String, String] = { ...

utf8Encode,utf8Decode,lines這幾個函數正是我們需要的,它們都是Pipe類型。我們可以把這幾個Pipe直接用through接到Stream上:

1 val startTime = System.currentTimeMillis //> startTime : Long = 1472444756321 2 val s1lines = s1.through(text.utf8Decode).through(text.lines) 3 .through(randomDelay(10 millis)).runFold(0)((b,_) => b + 1).unsafeRun 4 //> s1lines : Int = 479 5 println(s"reading s1 $s1lines lines in ${System.currentTimeMillis - startTime}ms") 6 //> reading s1 479 lines in 5370ms 7 8 val startTime2 = System.currentTimeMillis //> startTime2 : Long = 1472444761691 9 val s2lines = s2.through(text.utf8Decode).through(text.lines) 10 .through(randomDelay(10 millis)).runFold(0)((b,_) => b + 1).unsafeRun 11 //> s2lines : Int = 174 12 println(s"reading s2 $s2lines lines in ${System.currentTimeMillis - startTime2}ms") 13 //> reading s2 174 lines in 1923ms 14 val startTime3 = System.currentTimeMillis //> startTime3 : Long = 1472444763614 15 val s3lines = s3.through(text.utf8Decode).through(text.lines) 16 .through(randomDelay(10 millis)).runFold(0)((b,_) => b + 1).unsafeRun 17 //> s3lines : Int = 174 18 println(s"reading s3 $s3lines lines in ${System.currentTimeMillis - startTime3}ms") 19 //> reading s3 174 lines in 1928ms 20 println(s"reading all three files ${s1lines+s2lines+s3lines} total lines in ${System.currentTimeMillis - startTime}ms") 21 //> reading all three files 827 total lines in 9221ms

在以上的例子里我們用runFold函數統計文件的文字行數并在讀取過程中用randomDelay來制造了隨意長度的拖延。上面3個文件的字串讀取和轉換處理一共877行、9221ms。

我們知道fs2的并行運算函數concurrent.join函數類型款式是這樣的:

def join[F[_],O](maxOpen: Int)(outer: Stream[F,Stream[F,O]])(implicit F: Async[F]): Stream[F,O] = {...}

join運算的對象outer是個兩層Stream(Streams of Stream):Stream[F,Stream[F,P]],我們需要先進行類型款式調整:

1 val lines1 = s1.through(text.utf8Decode).through(text.lines).through(randomDelay(10 millis)) 2 //> lines1 : fs2.Stream[fs2.Task,String] = evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>) 3 val lines2 = s2.through(text.utf8Decode).through(text.lines).through(randomDelay(10 millis)) 4 //> lines2 : fs2.Stream[fs2.Task,String] = evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>) 5 val lines3 = s3.through(text.utf8Decode).through(text.lines).through(randomDelay(10 millis)) 6 //> lines3 : fs2.Stream[fs2.Task,String] = evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>) 7 val ss: Stream[Task,Stream[Task,String]] = Stream(lines1,lines2,lines3) 8 //> ss : fs2.Stream[fs2.Task,fs2.Stream[fs2.Task,String]] = Segment(Emit(Chunk(evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>), evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>), evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>))))

現在這個ss的類型復合我們的要求。我們可以測試一下并行運算的效率:

1 val ss_start = System.currentTimeMillis //> ss_start : Long = 1472449962698 2 val ss_lines = fs2.concurrent.join(3)(ss).runFold(0)((b,_) => b + 1).unsafeRun 3 //> ss_lines : Int = 827 4 println(s"parallel reading all files ${ss_lines} total lines in ${System.currentTimeMillis - ss_start}ms") 5 //> parallel reading all files 827 total lines in 5173ms

讀取同等行數但只用了5173ms,與之前的9221ms相比,大約有成倍的提速。

join(3)(ss)返回了一個合并的Stream,類型是Stream[Task,String]。我們可以運算這個Stream里母音出現的頻率。我們先設計這個統計函數:

1 //c 是個vowl 2 def vowls(c: Char): Boolean = List('A','E','I','O','U').contains(c) 3 //> vowls: (c: Char)Boolean 4 //直接用scala標準庫實現 5 def pipeVowlsCount: Pipe[Task,String,Map[Char,Int]] = 6 _.evalMap (text => Task.delay{ 7 text.toUpperCase.toList.filter(vowls).groupBy(s => s).mapValues(_.size) 8 }.schedule((text.length / 10).millis)) //> pipeVowlsCount: => fs2.Pipe[fs2.Task,String,Map[Char,Int]]

注意我們使用了text => Task.delay{...}.schedule(d),實際上我們完全可以用 text => Thread.sleep(d),但是這樣會造成了不純代碼,所以我們用evalMap來實現純代碼運算。試試統計全部字串內母音出現的總數:

1 import scalaz.{Monoid} 2 //為runFold提供一個Map[Char,Int]Monoid實例 3 implicit object mapMonoid extends Monoid[Map[Char,Int]] { 4 def zero: Map[Char,Int] = Map() 5 def append(m1: Map[Char,Int], m2: => Map[Char,Int]): Map[Char,Int] = { 6 (m1.keySet ++ m2.keySet).map { k => 7 (k, m1.getOrElse(k,0) + m2.getOrElse(k,0)) 8 }.toMap 9 } 10 } 11 val vc_start = System.currentTimeMillis //> vc_start : Long = 1472464772465 12 val vowlsLine = fs2.concurrent.join(3)(ss).through(pipeVowlsCount) 13 .runFold(Map[Char,Int]())(mapMonoid.append(_,_)).unsafeRun 14 ?//> vowlsLine : scala.collection.immutable.Map[Char,Int] = Map(E -> 3381, U - ?838, A -> 2361, I -> 2031, O -> 1824) 15 println(s"parallel reading all files and counted vowls sequencially in ${System.currentTimeMillis - vc_start}ms") 16 //> parallel reading all files and counted vowls sequencially in 10466ms

我們必須為runFold提供一個Monoid[Map[Char,Int]]實例mapMonoid。

那?我們又如何實現統計功能的并行運算呢??fs2.concurrent.join(maxOpen)(...)函數能把一個Stream截成maxOpen數的子Stream,然后對這些子Stream進行并行運算。那么我們又如何轉換Stream[F,Stream[F,O]]類型呢?我們必須把Stream[F,O]的O升格成Stream[F,O]。我們先用一個函數來把O轉換成Map[Char,Int],然后把這個函數升格成Stream[Task,Map[Char,Int],這個可以用Stream.eval實現:

1 def fVowlsCount(text: String): Map[Char,Int] = 2 text.toUpperCase.toList.filter(vowls).groupBy(s => s).mapValues(_.size) 3 //> fVowlsCount: (text: String)Map[Char,Int] 4 val parVowlsLine: Stream[Task,Stream[Task,Map[Char,Int]]] = fs2.concurrent.join(3)(ss) 5 .map {text => Stream.eval(Task {fVowlsCount(text)}.schedule((text.length / 10).millis))} 6 //> parVowlsLine : fs2.Stream[fs2.Task,fs2.Stream[fs2.Task,Map[Char,Int]]] = attemptEval(Task).flatMap(<function1>).flatMap(<function1>).mapChunks(<function1>)

我們來檢查一下運行效率:

1 val parvc_start = System.currentTimeMillis //> parvc_start : Long = 1472465844694 2 fs2.concurrent.join(8)(parVowlsLine) 3 .runFold(Map[Char,Int]())(mapMonoid.append(_,_)).unsafeRun 4 //> res0: scala.collection.immutable.Map[Char,Int] = Map(E -> 3381, U -> 838, A-> 2361, I -> 2031, O -> 1824) 5 println(s"parallel reading all files and counted vowls in ${System.currentTimeMillis - parvc_start}ms") 6 //> parallel reading all files and counted vowls in 4984ms

并行運算只需要4985ms,而流程運算需要10466+(9221-5173)=14xxx,這里有3,4倍的速度提升。

下面是這次討論的示范源代碼:

1 import fs2._ 2 import scala.language.{higherKinds,implicitConversions,postfixOps} 3 import scala.concurrent.duration._ 4 object fs2Merge { 5 implicit val strategy = Strategy.fromFixedDaemonPool(4) 6 implicit val scheduler = Scheduler.fromFixedDaemonPool(2) 7 def log[A](prompt: String): Pipe[Task,A,A] = _.evalMap { a => Task.delay{ println(s"$prompt>"); a }} 8 def randomDelay[A](max: FiniteDuration): Pipe[Task,A,A] = _.evalMap { a => 9 val delay: Task[Int] = Task.delay { scala.util.Random.nextInt(max.toMillis.toInt) } 10 delay.flatMap {d => Task.now(a).schedule(d.millis) } 11 } 12 13 val s1 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/BasicBackend.scala"),1024) 14 val s2 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/DatabaseConfig.scala"),1024) 15 val s3 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/BasicProfile.scala"),1024) 16 17 18 val startTime = System.currentTimeMillis 19 val s1lines = s1.through(text.utf8Decode).through(text.lines) 20 .through(randomDelay(10 millis)).runFold(0)((b,_) => b + 1).unsafeRun 21 println(s"reading s1 $s1lines lines in ${System.currentTimeMillis - startTime}ms") 22 23 val startTime2 = System.currentTimeMillis 24 val s2lines = s2.through(text.utf8Decode).through(text.lines) 25 .through(randomDelay(10 millis)).runFold(0)((b,_) => b + 1).unsafeRun 26 println(s"reading s2 $s2lines lines in ${System.currentTimeMillis - startTime2}ms") 27 val startTime3 = System.currentTimeMillis 28 val s3lines = s3.through(text.utf8Decode).through(text.lines) 29 .through(randomDelay(10 millis)).runFold(0)((b,_) => b + 1).unsafeRun 30 println(s"reading s3 $s3lines lines in ${System.currentTimeMillis - startTime3}ms") 31 println(s"reading all three files ${s1lines+s2lines+s3lines} total lines in ${System.currentTimeMillis - startTime}ms") 32 val lines1 = s1.through(text.utf8Decode).through(text.lines).through(randomDelay(10 millis)) 33 val lines2 = s2.through(text.utf8Decode).through(text.lines).through(randomDelay(10 millis)) 34 val lines3 = s3.through(text.utf8Decode).through(text.lines).through(randomDelay(10 millis)) 35 val ss: Stream[Task,Stream[Task,String]] = Stream(lines1,lines2,lines3) 36 val ss_start = System.currentTimeMillis 37 val ss_lines = fs2.concurrent.join(3)(ss).runFold(0)((b,_) => b + 1).unsafeRun 38 println(s"parallel reading all files ${ss_lines} total lines in ${System.currentTimeMillis - ss_start}ms") 39 40 //c 是個vowl 41 def vowls(c: Char): Boolean = List('A','E','I','O','U').contains(c) 42 //直接用scala標準庫實現 43 def pipeVowlsCount: Pipe[Task,String,Map[Char,Int]] = 44 _.evalMap (text => Task.delay{ 45 text.toUpperCase.toList.filter(vowls).groupBy(s => s).mapValues(_.size) 46 }.schedule((text.length / 10).millis)) 47 48 import scalaz.{Monoid} 49 //為runFold提供一個Map[Char,Int]Monoid實例 50 implicit object mapMonoid extends Monoid[Map[Char,Int]] { 51 def zero: Map[Char,Int] = Map() 52 def append(m1: Map[Char,Int], m2: => Map[Char,Int]): Map[Char,Int] = { 53 (m1.keySet ++ m2.keySet).map { k => 54 (k, m1.getOrElse(k,0) + m2.getOrElse(k,0)) 55 }.toMap 56 } 57 } 58 val vc_start = System.currentTimeMillis 59 val vowlsLine = fs2.concurrent.join(3)(ss).through(pipeVowlsCount) 60 .runFold(Map[Char,Int]())(mapMonoid.append(_,_)).unsafeRun 61 println(s"parallel reading all files and counted vowls sequencially in ${System.currentTimeMillis - vc_start}ms") 62 def fVowlsCount(text: String): Map[Char,Int] = 63 text.toUpperCase.toList.filter(vowls).groupBy(s => s).mapValues(_.size) 64 val parVowlsLine: Stream[Task,Stream[Task,Map[Char,Int]]] = fs2.concurrent.join(3)(ss) 65 .map {text => Stream.eval(Task {fVowlsCount(text)}.schedule((text.length / 10).millis))} 66 val parvc_start = System.currentTimeMillis 67 fs2.concurrent.join(8)(parVowlsLine) 68 .runFold(Map[Char,Int]())(mapMonoid.append(_,_)).unsafeRun 69 println(s"parallel reading all files and counted vowls in ${System.currentTimeMillis - parvc_start}ms") 70 }

?

?

?

?

?

?

?

?

?

?

?

轉載于:https://www.cnblogs.com/tiger-xc/p/5820446.html

總結

以上是生活随笔為你收集整理的Scalaz(58)- scalaz-stream: fs2-并行运算示范,fs2 parallel processing的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 97在线观看视频免费 | 日韩欧美不卡在线 | 性――交――性――乱a | 免费观看日本 | 12av毛片| 日韩午夜在线播放 | 色婷婷在线播放 | 成年人黄色录像 | 一区二区美女 | 日韩欧美国产高清91 | 亚洲第一区在线播放 | 一区二区三区在线观看视频 | 久草网站 | 亚洲婷婷综合网 | 中文字幕在线播放不卡 | 午夜视频在线免费观看 | 色吧av| 日韩最新av | 亚洲欧美日韩另类 | 天天干 夜夜操 | 日韩激情在线观看 | 嫩草大剧院 | 91手机在线观看 | www欧美com| 亚洲av第一成肉网 | 夜夜天天 | 国产精品久久久毛片 | 久久夜色精品 | 久久国产精品毛片 | 9色av| 欧美a∨亚洲欧美亚洲 | 一区二区三区国产av | 香蕉视频影院 | 亚洲国产成人91精品 | 成人在线免费看视频 | 人妻无码中文字幕免费视频蜜桃 | 女人18岁毛片| 91视频亚洲 | 一区二区三区黄色录像 | 久久久久久av无码免费网站下载 | 久久久青草| 最新啪啪网站 | 午夜尤物| 免费se99se | 日本三级在线 | 日日射日日干 | 成人性生交大片免费看vrv66 | 中文字幕日日 | 成人h动漫精品一区二 | 日韩视频一 | 天天操狠狠操夜夜操 | 久久福利精品 | 亚洲视频在线观看视频 | 香蕉传媒 | 亚洲一区二区av | 超碰在线观看99 | 男人和女人日批视频 | 少妇2做爰交换朴银狐 | 美女100%露胸无遮挡 | 日韩二级片 | av免费影院| 国产麻豆精品在线观看 | 欧美视频一二三 | 成人免费视频网站 | 超碰狠狠干 | 欧美俄罗斯乱妇 | 灌满闺乖女h高h调教尿h | 亚洲午夜精品久久 | av片免费在线 | 久草一本| 久久午夜精品人妻一区二区三区 | 久久白虎 | 人妻少妇一区 | 久久久久久久久久一区 | 农村少妇久久久久久久 | 夜夜嗷| 欧美狠狠爱 | 中文字幕丰满乱子伦无码专区 | 一区二区片 | 日韩一区二区视频在线播放 | 无码人妻精品一区二区三区9厂 | 国产极品美女高潮无套嗷嗷叫酒店 | 日韩少妇内射免费播放 | 国产无遮挡又黄又爽免费网站 | 国产美女免费观看 | 日韩午夜网站 | av福利网址 | 天堂аⅴ在线最新版在线 | 国产精品久久久久久三级 | 中文字幕第23页 | 国产精品videossex久久发布 | 四虎一级片| 久久偷看各类女兵18女厕嘘嘘 | 强videoshd酒醉 | 成全世界免费高清观看 | 欧美疯狂做受 | 女生脱裤子让男生捅 | 日韩狠狠 | 久久伊人超碰 |