时间:06:10:11 来源:世界女排大奖赛 作者:蜘蛛亚博提现规则 点击:1
{随机段子}

石嘴山腊啥网络技术有限公司_SDP(13): Scala.Future - far from completion,绝不能用来做甩手掌柜

??在前面几篇关于数据库引擎的讨论里很多的运算函数都返回了scala.Future类型的结果,因为我以为这样就可以很方便的实现了non-blocking效果。无论任何复杂的数据处理操作,只要把它们包在一个Future{...}里扔给系统运算就算完事不理了,马上可以把关注放到编程的其它部分了。在3月17日的深圳scala用户meetup里我做了个关于scala函数式编程的分享,里面我提到现在使用最多的函数组件就是scala.Future了。我想这应该在scala用户群里是个比较普遍的现象:大家都认为这是实现non-blocking最直接的一种方式。不过当我在meetup后回想到scala.Future时突然意识到它是一种即时运算值strict-value,看看下面这个例子:

  import scala.concurrent.duration._
  val fs = Future {println("run now..."); System.currentTimeMillis() }
                                         //> run now...
                                         //| fs  : scala.concurrent.Future[Long] = List()
  Await.result(fs, 1.second)             //> res0: Long = 1465907784714
  Thread.sleep(1000)
  Await.result(fs, 1.second)             //> res1: Long = 1465907784714

可以看到fs是在Future构建时即时运算的,而且只会运算一次。如果scala Future中包括了能产生副作用的代码,在构建时就会立即产生副作用。所以我们是无法使用scala Future来编写纯函数的,如下:

val progA:Future[A] = for {
    b <- readFromB
    _ <- writeToLocationA(a)
    r <- getResult
} yield r

/* location A content updated */

... /* later */

val progB: Future[B] = for {
    a <- readFromA
    _ <- updateLocationA
    c <- getResult
}

...

val program: Future[Unit] = for {
    _ <- progA
    _ <- progB
} yield()

在上面这个例子里最终的目的是运算program:由progA,progB两个子程序组成。这两个子程序在构建的时候已经开始了运算,随时都会更新localionA产生副作用。想象一下如果progA,progB是埋藏在其它一大堆源代码里的话program的运算结果肯定是无法预测的。换言之用Future来进行函数式组合就是在给自己挖坑嘛,最起码要记住这些Future的构建顺序,而这个要求在大型的协作开发软件工程里基本上是不可能的事。除了无法安全进行函数组合外scala.Future还缺少运算和线程控制的功能,比如:

无法控制什么时候开始运算

无法控制在在哪个线程运算

无法终止开始运算的程序

缺少有效的异常处理机制如fallback,retry等

scalaz和monix函数组件库里都提供了Task来辅助Future实现函数组合。scalaz.Task是基于scalaz.Future的:

sealed abstract class Future[+A] {
...
object Future {
  case class Now[+A](a: A) extends Future[A]
  case class Async[+A](onFinish: (A => Trampoline[Unit]) => Unit) extends Future[A]
  case class Suspend[+A](thunk: () => Future[A]) extends Future[A]
  case class BindSuspend[A,B](thunk: () => Future[A], f: A => Future[B]) extends Future[B]
  case class BindAsync[A,B](onFinish: (A => Trampoline[Unit]) => Unit,
                            f: A => Future[B]) extends Future[B]
...

scalaz.Future[A]明显就是个Free Monad。它的结构化表达方式分别有Now,Async,Suspend,BindSuspend,BindAsync。我们可以用这些结构实现flatMap函数,所以Future就是Free Monad:

def flatMap[B](f: A => Future[B]): Future[B] = this match {
    case Now(a) => Suspend(() => f(a))
    case Suspend(thunk) => BindSuspend(thunk, f)
    case Async(listen) => BindAsync(listen, f)
    case BindSuspend(thunk, g) =>
      Suspend(() => BindSuspend(thunk, g andThen (_ flatMap f)))
    case BindAsync(listen, g) =>
      Suspend(() => BindAsync(listen, g andThen (_ flatMap f)))
  }

因为free structure类型支持算式/算法关注分离,我们可以用scalaz.Future来描述程序功能而不涉及正真运算。这样,在上面那个例子里如果progA,progB是Task类型的,那么program的构建就是安全的,因为我们最后是用Task.run来真正进行运算产生副作用的。scalaz.Task又在scalaz.Future功能基础上再增加了异常处理等功能。

monix.Task采取了延迟运算的方式来实现算式/算法分离,下面是这个类型的基础构建结构:

  /** [[Task]] state describing an immediate synchronous value. */
  private[eval] final case class Now[A](value: A) extends Task[A] {...}
  /** [[Task]] state describing an immediate synchronous value. */
  private[eval] final case class Eval[A](thunk: () => A)
    extends Task[A]

  /** Internal state, the result of [[Task.defer]] */
  private[eval] final case class Suspend[+A](thunk: () => Task[A])
    extends Task[A]

  /** Internal [[Task]] state that is the result of applying `flatMap`. */
  private[eval] final case class FlatMap[A, B](source: Task[A], f: A => Task[B])
    extends Task[B]
 /** Internal [[Coeval]] state that is the result of applying `map`. */
  private[eval] final case class Map[S, +A](source: Task[S], f: S => A, index: Int)
    extends Task[A] with (S => Task[A]) {

    def apply(value: S): Task[A] =
      new Now(f(value))
    override def toString: String =
      super[Task].toString
  }

  /** Constructs a lazy [[Task]] instance whose result will
    * be computed asynchronously.
    *
    * Unsafe to build directly, only use if you know what you"re doing.
    * For building `Async` instances safely, see [[create]].
    */
  private[eval] final case class Async[+A](register: (Context, Callback[A]) => Unit)
    extends Task[A]  

下面的例子里示范了如果用这些结构来构件monix.Task:?

Task state describing an immediate synchronous value. private eval final case class Now A value: A extends Task A ... Task state describing an immediate synchronous value. private eval final case class Eval A thunk: gt A extends Task A Internal state, the result of Task. defer private eval final case class Suspend A thunk: gt Task A extends Task A Internal Task state that is the result of applying flatMap. private eval final case class FlatMap A, B source: Task A, f: A gt Task B extends Task B Internal Coeval state that is the result of applying map. private eval final case class Map S, A source: Task S, f: S gt A, index: Int extends Task A with S gt Task A def apply value: S: Task A new Now f value override def toString: String super Task. toString Constructs a lazy Task instance whose result will be computed asynchronously. Unsafe to build directly, only use if you know what you" re doing. For building Async instances safely, see create. private eval final case class Async A register: Context, Callback A gt Unit extends Task A xia mian di li zi li shi fan liao ru guo yong zhei xie jie gou lai gou jian monix. Task:?

object Task extends TaskInstancesLevel1 {
  /** Returns a new task that, when executed, will emit the result of
    * the given function, executed asynchronously.
    *
    * This operation is the equivalent of:
    * {{{
    *   Task.eval(f).executeAsync
    * }}}
    *
    * @param f is the callback to execute asynchronously
    */
  def apply[A](f: => A): Task[A] =
    eval(f).executeAsync

  /** Returns a `Task` that on execution is always successful, emitting
    * the given strict value.
    */
  def now[A](a: A): Task[A] =
    Task.Now(a)

  /** Lifts a value into the task context. Alias for [[now]]. */
  def pure[A](a: A): Task[A] = now(a)

  /** Returns a task that on execution is always finishing in error
    * emitting the specified exception.
    */
  def raiseError[A](ex: Throwable): Task[A] =
    Error(ex)

  /** Promote a non-strict value representing a Task to a Task of the
    * same type.
    */
  def defer[A](fa: => Task[A]): Task[A] =
    Suspend(fa _)
...}
    source match {
      case Task.Now(v) => F.pure(v)
      case Task.Error(e) => F.raiseError(e)
      case Task.Eval(thunk) => F.delay(thunk())
      case Task.Suspend(thunk) => F.suspend(to(thunk()))
      case other => suspend(other)(F)
    }

这个Suspend结构就是延迟运算的核心。monix.Task是一套新出现的解决方案,借鉴了许多scalaz.Task的概念和方法同时又加入了很多优化、附加的功能,并且github更新也很近期。使用monix.Task应该是一个正确的选择。

首先我们必须解决scala.Future与monix.Task之间的转换:

  import monix.eval.Task
  import monix.execution.Scheduler.Implicits.global
  
  final class FutureToTask[A](x: => Future[A]) {
    def asTask: Task[A] = Task.deferFuture[A(x)
  }

  final class TaskToFuture[A](x: => Task[A]) {
    def asFuture: Future[A] = x.runAsync
  }

下面是一个完整的Task用例:

import scala.concurrent._
import scala.util._
import scala.concurrent.duration._
import monix.eval.Task
import monix.execution._
object MonixTask extends App {
import monix.execution.Scheduler.Implicits.global

  // Executing a sum, which (due to the semantics of apply)
  // will happen on another thread. Nothing happens on building
  // this instance though, this expression is pure, being
  // just a spec! Task by default has lazy behavior ;-)
  val task = Task { 1 + 1 }

  // Tasks get evaluated only on runAsync!
  // Callback style:
  val cancelable = task.runOnComplete {
      case Success(value) =>
        println(value)
      case Failure(ex) =>
        System.out.println(s"ERROR: ${ex.getMessage}")
  }
  //=> 2

  // If we change our mind...
  cancelable.cancel()

  // Or you can convert it into a Future
  val future: CancelableFuture[Int] =
    task.runAsync

  // Printing the result asynchronously
  future.foreach(println)
  //=> 2

  val task = Task.now { println("Effect"); "Hello!" }
  //=> Effect
  // task: monix.eval.Task[String] = Delay(Now(Hello!))
}

下面我们就看看各种Task的构建方法:

  /* ------ taskNow ----*/
  val taskNow = Task.now { println("Effect"); "Hello!" }
  //=> Effect
  // taskNow: monix.eval.Task[String] = Delay(Now(Hello!))

  /* --------taskDelay possible another on thread ------*/
  val taskDelay = Task { println("Effect"); "Hello!" }
  // taskDelay: monix.eval.Task[String] = Delay(Always())

  taskDelay.runAsync.foreach(println)
  //=> Effect
  //=> Hello!

  // The evaluation (and thus all contained side effects)
  // gets triggered on each runAsync:
  taskDelay.runAsync.foreach(println)
  //=> Effect
  //=> Hello!

  /* --------taskOnce ------- */
  val taskOnce = Task.evalOnce { println("Effect"); "Hello!" }
  // taskOnce: monix.eval.Task[String] = EvalOnce()

  taskOnce.runAsync.foreach(println)
  //=> Effect
  //=> Hello!

  // Result was memoized on the first run!
  taskOnce.runAsync.foreach(println)
  //=> Hello!

  /* --------taskFork ------- */
  // this guarantees that our task will get executed asynchronously:
  val task = Task(Task.eval("Hello!")).executeAsync
  //val task = Task.fork(Task.eval("Hello!"))

  // The default scheduler
  import monix.execution.Scheduler.Implicits.global

  // Creating a special scheduler meant for I/O
  import monix.execution.Scheduler
  lazy val io = Scheduler.io(name="my-io")
  //Then we can manage what executes on which:

  // Override the default Scheduler by fork:
  val source = Task(println(s"Running on thread: ${Thread.currentThread.getName}"))
  val forked = source.executeOn(io,true)
  // val forked = Task.fork(source, io)

  source.runAsync
  //=> Running on thread: ForkJoinPool-1-worker-1
  forked.runAsync
  //=> Running on thread: my-io-4

  /* --------taskError ------- */
  import scala.concurrent.TimeoutException

  val taskError = Task.raiseError[Int](new TimeoutException)
  // error: monix.eval.Task[Int] =
  //   Delay(Error(java.util.concurrent.TimeoutException))

  taskError.runOnComplete(result => println(result))
  //=> Failure(java.util.concurrent.TimeoutException)

下面是一些控制函数:

  final def doOnFinish(f: Option[Throwable] => Task[Unit]): Task[A] =
  final def doOnCancel(callback: Task[Unit]): Task[A] =
  final def onCancelRaiseError(e: Throwable): Task[A] =
  final def onErrorRecoverWith[B >: A](pf: PartialFunction[Throwable, Task[B]]): Task[B] =
  final def onErrorHandleWith[B >: A](f: Throwable => Task[B]): Task[B] =
  final def onErrorFallbackTo[B >: A](that: Task[B]): Task[B] =
  final def restartUntil(p: (A) => Boolean): Task[A] =
  final def onErrorRestart(maxRetries: Long): Task[A] =
  final def onErrorRestartIf(p: Throwable => Boolean): Task[A] =
  final def onErrorRestartLoop[S, B >: A](initial: S)(f: (Throwable, S, S => Task[B]) => Task[B]): Task[B] =
  final def onErrorHandle[U >: A](f: Throwable => U): Task[U] =
  final def onErrorRecover[U >: A](pf: PartialFunction[Throwable, U]): Task[U] =

Task是通过asyncRun和runSync来进行异步、同步实际运算的:?

  def runAsync(implicit s: Scheduler): CancelableFuture[A] =
  def runAsync(cb: Callback[A])(implicit s: Scheduler): Cancelable =
  def runAsyncOpt(implicit s: Scheduler, opts: Options): CancelableFuture[A] =
  def runAsyncOpt(cb: Callback[A])(implicit s: Scheduler, opts: Options): Cancelable =
  final def runSyncMaybe(implicit s: Scheduler): Either[CancelableFuture[A], A] =
  final def runSyncMaybeOpt(implicit s: Scheduler, opts: Options): Either[CancelableFuture[A], A] = 
  final def runSyncUnsafe(timeout: Duration)
    (implicit s: Scheduler, permit: CanBlock): A =
  final def runSyncUnsafeOpt(timeout: Duration)
    (implicit s: Scheduler, opts: Options, permit: CanBlock): A =
  final def runOnComplete(f: Try[A] => Unit)(implicit s: Scheduler): Cancelable =

下面示范了两个通常的Task运算方法:

  val task1 = Task {println("sum:"); 1+2}.delayExecution(1 second)
  println(task1.runSyncUnsafe(2 seconds))
  
  task1.runOnComplete {
    case Success(r) => println(s"result: $r")
    case Failure(e) => println(e.getMessage)
  }

下面是本次示范的源代码:

import scala.util._
import scala.concurrent.duration._
import monix.eval.Task
import monix.execution._
object MonixTask extends App {
import monix.execution.Scheduler.Implicits.global



  // Executing a sum, which (due to the semantics of apply)
  // will happen on another thread. Nothing happens on building
  // this instance though, this expression is pure, being
  // just a spec! Task by default has lazy behavior ;-)
  val task = Task { 1 + 1 }

  // Tasks get evaluated only on runAsync!
  // Callback style:
  val cancelable = task.runOnComplete {
      case Success(value) =>
        println(value)
      case Failure(ex) =>
        System.out.println(s"ERROR: ${ex.getMessage}")
  }
  //=> 2

  // If we change our mind...
  cancelable.cancel()

  // Or you can convert it into a Future
  val future: CancelableFuture[Int] =
    task.runAsync

  // Printing the result asynchronously
  future.foreach(println)
  //=> 2

  /* ------ taskNow ----*/
  val taskNow = Task.now { println("Effect"); "Hello!" }
  //=> Effect
  // taskNow: monix.eval.Task[String] = Delay(Now(Hello!))

  /* --------taskDelay possible another on thread ------*/
  val taskDelay = Task { println("Effect"); "Hello!" }
  // taskDelay: monix.eval.Task[String] = Delay(Always())

  taskDelay.runAsync.foreach(println)
  //=> Effect
  //=> Hello!

  // The evaluation (and thus all contained side effects)
  // gets triggered on each runAsync:
  taskDelay.runAsync.foreach(println)
  //=> Effect
  //=> Hello!

  /* --------taskOnce ------- */
  val taskOnce = Task.evalOnce { println("Effect"); "Hello!" }
  // taskOnce: monix.eval.Task[String] = EvalOnce()

  taskOnce.runAsync.foreach(println)
  //=> Effect
  //=> Hello!

  // Result was memoized on the first run!
  taskOnce.runAsync.foreach(println)
  //=> Hello!

  /* --------taskFork ------- */
  // this guarantees that our task will get executed asynchronously:
  val task = Task(Task.eval("Hello!")).executeAsync
  //val task = Task.fork(Task.eval("Hello!"))

  // The default scheduler
  import monix.execution.Scheduler.Implicits.global

  // Creating a special scheduler meant for I/O
  import monix.execution.Scheduler
  lazy val io = Scheduler.io(name="my-io")
  //Then we can manage what executes on which:

  // Override the default Scheduler by fork:
  val source = Task(println(s"Running on thread: ${Thread.currentThread.getName}"))
  val forked = source.executeOn(io,true)
  // val forked = Task.fork(source, io)

  source.runAsync
  //=> Running on thread: ForkJoinPool-1-worker-1
  forked.runAsync
  //=> Running on thread: my-io-4

  /* --------taskError ------- */
  import scala.concurrent.TimeoutException

  val taskError = Task.raiseError[Int](new TimeoutException)
  // error: monix.eval.Task[Int] =
  //   Delay(Error(java.util.concurrent.TimeoutException))

  taskError.runOnComplete(result => println(result))
  //=> Failure(java.util.concurrent.TimeoutException)

  

  val task1 = Task {println("sum:"); 1+2}.delayExecution(1 second)
  println(task1.runSyncUnsafe(2 seconds))

  task1.runOnComplete {
    case Success(r) => println(s"result: $r")
    case Failure(e) => println(e.getMessage)
  }

}

?

?

?

当前文章:http://www.honghiem.com/pmc/16764-411366-63811.html

发布时间:03:56:41

曾道人救世网??www.78927.com??香港马会六合网??www.667138.com??小鱼儿论坛??六彩开奖结果直播现场??曾道人中特网??www.45966.com??通天论坛??九龙内慕免费资料大全??

本文标签: 鄂尔多斯刂瞧培训学校 日照杂浩机械设备有限公司 张北韶瓷金融集团

回到顶部