官术网_书友最值得收藏!

SchedulerActor

The relevant parts of the code are displayed and explained. Now let us see how to obtain price data:

def constructUrl(exchange: String): String =
{
"https://min-api.cryptocompare.com/data/histominute?fsym=BTC&tsym=USD&limit=23&aggregate=1&e=" + exchange
}

ConstructUrl returns a completely formed URL for the request to the Cryptocompare API. More details are given in section related to the API:

final val predictionActor = system.actorOf(Props(new PredictionActor(configuration, db)))
final val traderActor = system.actorOf(Props(new TraderActor(ws)))

Creates instances of PredictionActor and TraderActor:

override def receive: Receive = {

The Receive method is defined in the actor trait and has to be implemented. It is triggered when someone passes a message to this actor (Scheduler in our case):

case _ =>
val futureResponse=restClient.getPayload(constructUrl(exchange))

In the preceding code, case _ => means that we react to any message of any type and content. The first thing that is done is an async call to the Cryptocompare API by the URL specified before. This is done with the help of RestClient, which returns Future with the response JSON. After receiving the response (inside futureResponse on complete callback), .json is mapped into the custom case class CryptoCompareResponse:

case class CryptoCompareResponse(Response: String, Type: Int, Aggregated: Boolean, Data: List[OHLC],     FirstValueInArray: Boolean, TimeTo: Long,TimeFrom: Long)

The case class is similar to POJO (Plain Old Java Object) without the need to write constructors and getters/setters:

object CryptoCompareResponse {
implicit val cryptoCompareResponseReads = Json.reads[CryptoCompareResponse]
}

This companion object is required for mapping JSON into this class. The CryptocompareResponse object stores the output of the API—a list of OHLC data, time range of data and others which that are not relevant to us. The OHLC class corresponds to actual price data:

case class OHLC(time: Long, open: Double, 
high: Double,
low: Double,
close: Double,
volumefrom: Double,
volumeto: Double)

After the data is ready, prices are stored in the DB by calling storePriceData(cryptoCompareResponse). At first, it does a batch insert (using Anorm's BatchSQL) into the PRICE_STAGING table and re-inserts into PRICE with deduplication with respect to timestamp, as we are receiving overlapping price data:

val batch = BatchSql(
"""|INSERT INTO PRICE_STAGING(TIMESTAMP,EXCHANGE,PRICE_OPEN,PRICE_CLOSED,VOLUME_BTC,
VOLUME_USD)| VALUES({timestamp}, {exchange}, {priceOpen}, {priceClosed}, {volumeBTC}, {volumeUSD})""".stripMargin,transformedPriceDta.head,transformedPriceDta.tail:_*)
val res: Array[Int] = batch.execute() // array of update count
val reInsert = SQL(
"""
|INSERT INTO PRICE(TIMESTAMP, EXCHANGE, PRICE_OPEN, PRICE_CLOSED, VOLUME_BTC, VOLUME_USD)
|SELECT TIMESTAMP, EXCHANGE, PRICE_OPEN, PRICE_CLOSED, VOLUME_BTC, VOLUME_USD
|FROM PRICE_STAGING AS s
|WHERE NOT EXISTS (
|SELECT *
|FROM PRICE As t
|WHERE t.TIMESTAMP = s.TIMESTAMP
|)
""".stripMargin).execute()
Logger.debug("reinsert " + reInsert)

After storing into the DB, SchedulerActor transforms OHLC data into (timestamp, delta) tuples, where delta is (closePrice-openPrice). So the format is suitable for the ML model. The transformed data is passed as a message to PredictionActor with explicit waiting for a response. This is done by using the ? operator. We ask the prediction actor:

(predictionActor ? CryptoCompareDTOToPredictionModelTransformer.tranform(cryptoCompareResponse)).mapTo[CurrentDataWithShortTermPrediction].map {

Its response is mapped to the CurrentDataWithShortTermPrediction class and passed to TraderActor using the ! operator. Unlike ?, the ! operator does not require a response:

predictedWithCurrent =>
traderActor ! predictedWithCurrent}

This was basic a walkthrough of SchedulerActor. We read data from the Cryptocompare API, store it into the database, send to PredictionActor and wait for its response. Then we forward its response to TraderActor.

Now let's see what happens inside PredictionActor.

主站蜘蛛池模板: 苏尼特左旗| 旬阳县| 武邑县| 沧源| 延庆县| 综艺| 蓬莱市| 泰和县| 双桥区| 九龙城区| 于都县| 江永县| 威海市| 南皮县| 南木林县| 石台县| 资阳市| 澄迈县| 酉阳| 曲沃县| 玉田县| 新民市| 化隆| 浙江省| 仁怀市| 视频| 刚察县| 乐至县| 五指山市| 清水县| 天柱县| 平安县| 西乌| 宣化县| 衢州市| 文水县| 博乐市| 普定县| 驻马店市| 遂溪县| 凤凰县|