src/main/scala/CouchStream.scala
changeset 4 b25c02bd6b11
parent 0 a279a342bc78
child 5 141e76d946e3
equal deleted inserted replaced
3:e8daf3ff497d 4:b25c02bd6b11
     1 package couchdb.changes
     1 package couchdb.changes
     2 
     2 
       
     3 import akka.actor.{ActorRef, Actor, ActorLogging, Status}
       
     4 import akka.io.IO
       
     5 
       
     6 import scala.concurrent.ExecutionContext.Implicits.global
       
     7 import scala.language.postfixOps
       
     8 import scala.language.reflectiveCalls
       
     9 import scala.util.Try
       
    10 
       
    11 import spray.can.Http
       
    12 import spray.can.Http.ConnectionAttemptFailedException
       
    13 import spray.client.pipelining._
     3 import spray.httpx.unmarshalling.{MalformedContent, Unmarshaller, Deserialized}
    14 import spray.httpx.unmarshalling.{MalformedContent, Unmarshaller, Deserialized}
     4 import spray.http._
    15 import spray.http._
     5 import spray.json._
    16 import spray.json._
     6 import spray.client.pipelining._
       
     7 import akka.actor.{ActorRef, Actor, ActorLogging}
       
     8 import scala.util.Try
       
     9 import spray.can.Http
       
    10 import akka.io.IO
       
    11 
    17 
    12 case class ResultRow(seq: Int, id: String, doc: JsObject)
    18 case class ResultRow(seq: Int, id: String, doc: JsObject)
    13 
    19 
    14 trait ChangesMarshaller {
    20 trait ChangesMarshaller {
    15 
    21 
    23 
    29 
    24   }
    30   }
    25 }
    31 }
    26 
    32 
    27 class ChangesStreamActor(processor: ActorRef) extends Actor with ActorLogging
    33 class ChangesStreamActor(processor: ActorRef) extends Actor with ActorLogging
    28   with ChangesMarshaller {
    34   with ChangesMarshaller with settings {
    29 
    35 
    30   val io = IO(Http)(context.system)
    36   val io = IO(Http)(context.system)
    31 
    37 
       
    38   def config = context.system.settings.config
       
    39 
    32   def receive: Receive = {
    40   def receive: Receive = {
    33     case uri: Uri =>
    41     case Start =>
       
    42       val uri = Uri(s"${couchConf.url}/_changes").withQuery(
       
    43           "feed" -> "continuous"
       
    44         , "heartbeat" -> couchConf.heartbeat.toMillis.toString
       
    45         , "include_docs" -> "true"
       
    46       )
    34       val rq = HttpRequest(HttpMethods.GET, uri = uri)
    47       val rq = HttpRequest(HttpMethods.GET, uri = uri)
    35       log.debug("Sending request {}", rq)
    48       log.debug("Sending request {}", rq)
    36       sendTo(io).withResponsesReceivedBy(self)(rq)
    49       sendTo(io).withResponsesReceivedBy(self)(rq)
    37     case x@ChunkedResponseStart(_) =>
    50     case x@ChunkedResponseStart(_) =>
    38       log.debug("Received {}", x)
    51       log.debug("Received {}", x)
    41       if (data.length > 2) // skip heartbeat messages
    54       if (data.length > 2) // skip heartbeat messages
    42         ChangesMarshaller(data).fold(
    55         ChangesMarshaller(data).fold(
    43           _ => log.warning("Failed to deserialize entity {}", data),
    56           _ => log.warning("Failed to deserialize entity {}", data),
    44           processor !
    57           processor !
    45         )
    58         )
       
    59     case Status.Failure(t) =>
       
    60       log.error("Received failure {}", t)
       
    61       t match {
       
    62         case x:ConnectionAttemptFailedException =>
       
    63           log.info("Scheduling reconnect to couchdb in {}", couchConf.reconnect)
       
    64           context.system.scheduler.scheduleOnce(couchConf.reconnect, self,
       
    65             Start)
       
    66         case _ =>
       
    67       }
    46     case x =>
    68     case x =>
    47       log.debug("Received {}", x)
    69       log.debug("Received {}", x)
    48   }
    70   }
    49 }
    71 }
    50 
    72