TIKA使用

TIKA

Tika下载

  1. server.jar
    http://tika.apache.org/download.html
    java -jar tika-server-1.17.jar
    

    下载server版,需要java运行环境。注:JAVA9默认缺少server运行所需要的xml.bind包,需要另行解决,JAVA8无问题。

  2. docker
    docker pull logicalspark/docker-tikaserver # only on initial download/update
    docker run --rm -p 9998:9998 logicalspark/docker-tikaserver
    
  3. app.jar
    app也有server模式,但他并非HTPP协议,所以无法使用curl调试。
    
  4. maven

Docker Server

  1. 测试服务器
    curl -X GET http://localhost:9998/tika 
    
  2. 获取meta
    curl -T test.pdf http://localhost:9998/meta --header "Accept: application/json"
    
  3. 获取文档内容
    curl -X PUT --data-binary @test.pdf http://localhost:9998/tika --header "Content-Type: text/pdf"
    curl -T test.pdf http://localhost:9998/tika --header "Accept: text/html" # 返回html,带标签,可不带header
    

Go Implement

var tikaServerUrl = "http://localhost:9998/"
func putRequest(url, filename string) (string, error) {
	file, err := os.Open(filename)
	if err != nil {
		return "", err
	}
	req, err := http.NewRequest(http.MethodPut, url, file)
	client := &http.Client{}
	response, err := client.Do(req)
	if err != nil {
		return "", err
	}
	defer response.Body.Close()
	b, _ := ioutil.ReadAll(response.Body)
	return string(b), nil
}

文档

https://wiki.apache.org/tika/TikaJAXRS

django CAS adn oAuth2

CAS

Solution

  1. 使用CAS作为认证协议。
  2. A作为主要的认证提供方(provider)。
  3. A保留用户系统,其余系统如xxx/www不保留用户系统,即Provider的实现在A。
  4. 实现步骤
    • xxx 选择登录,跳转到LMS的认证界面,CAS读取数据库进行认证,redirect到xxx的界面并且附带ticket在url中,在浏览器中存入Cookie。
    • xxx得到ticket后向CAS发送ticket验证有效性。
    • xxx允许用户访问内部资源。
  1. 代码实现

django代码

初始化一个client项目

django-admin startproject cas-client

Install Dependencies

pip install django-mama-cas # server
pip install django-cas-ng # client

Server

# settings.py
INSTALLED_APPS = (
    'mama_cas',
)

# 允许退出登录,可选项
MAMA_CAS_ENABLE_SINGLE_SIGN_OUT = True

# 重要!,service是client的IP,是个数组,可以在后面添加SERVICE的HOST:PORT。
MAMA_CAS_SERVICES = [
    {
        'SERVICE': 'http://127.0.1.1:8000',
        'CALLBACKS': [
            'mama_cas.callbacks.user_model_attributes',     # 返回除了password的所有Field
            # 'mama_cas.callbacks.user_name_attributes',    # 只返回 username
        ],
        'LOGOUT_ALLOW': True,
        'LOGOUT_URL': 'http://127.0.1.1:8000/accounts/callback',
    },
]

# urls.py
url(r'', include('mama_cas.urls')),

不要忘记:

python3 manage.py migrate

Client

# settings.py
INSTALLED_APPS = (
    # ... other installed apps
    'django_cas_ng',
)

AUTHENTICATION_BACKENDS = (
    'django.contrib.auth.backends.ModelBackend',
    'django_cas_ng.backends.CASBackend',
)

# 也就是LMS的地址
CAS_SERVER_URL = 'http://127.0.0.1:8000'
CAS_VERSION = '3'

# 存入所有CAS 服务端返回的user数据。
CAS_APPLY_ATTRIBUTES_TO_USER = True

# urls.py
import django_cas_ng.views as cas_views
url(r'^accounts/login$', cas_views.login, name='cas_ng_login'),
url(r'^accounts/logout$', cas_views.logout, name='cas_ng_logout'),
url(r'^accounts/callback$', cas_views.views.callback, name='cas_ng_proxy_callback'),

Also:

python3 manage.py migrate

使用流程

  1. client上选择登录,后台redirect到server的/accounts/login
  2. 认证通过,在client上的host下会有登录Cookie,成功登录到系统,重定向到client的主页。
  3. client上选择退出,后台redirect到server的/accounts/logout

注意事项

  • server与client不能在同一个host下,会发生500内部错误,因为Cookie要存回client的host下。
  • 本地测试下,client启动在127.0.1.1:8000的时候,要在settings.py中ALLOWED_HOSTS中添加这个IP。
  • client端要实现接收空路由情况,在CAS Server认证完毕后,返回地址为根地址。
  • client端退出登录同样要经过CAS Server,同时要在CAS Server中打开允许退出登录。

Oauth2 结合 CAS 搭建认证系统

  • 在CAS页面,点击Github登录,利用state参数保存当前页面的service参数。
  • 点击确认登陆后,返回state,获取用户数据,重定向到一个处理函数。
  • 登录系统,发送ticket,重定向到service。
  • 两个request不是同一个request,所以无法用session或cookie保存service的url。

influxdb——时间序列数据库

高阶

归档

  • retention policy 创建保留策略为2 week用来包存10分钟一次的平均值归档。
    CREATE RETENTION POLICY "two_week" ON "house" DURATION 2w REPLICATION 1
    

    HTTP

    https://twinpines-9429794e.influxcloud.net:8086/query?u=xiaogu&p=123q456w&db=house_data&pretty=true&q=CREATE%20RETENTION%20POLICY%20%22test%22%20ON%20%22house_data%22%20DURATION%201d%20REPLICATION%201
    
  • continuous query
    1. 切换数据库
      use house
      
    2. 创建连续查询
  • 可以按照tag区分, group by *是所有的tag
    create continuous query "cq_5m" on "house" begin select mean("Voltage") as "mean_voltage" into "two_week"."house_two_week_tags_test" from "house" group by time(5m), * end
    

    注:其中cq_10m为连续查询的名称,house为数据库名,into后的two_week为house数据库上的保留策略,house_two_week为新值的表(measurement)house为数据来源表,group by为10分钟,十分钟归档一次。

用户系统

  1. “CREATE USER” user_name “WITH PASSWORD” password [ “WITH ALL PRIVILEGES” ] .

kapacitor——可编辑脚本的自动报警程序

Kapacitor写TICKscript

kapacitor define house -tick house.tick -type stream -dbrp house.autogen `type也可以是batch`
kapacitor show house `查看任务的信息`
kapacitor enable house
# house.tick
stream
|from()
.measurement('house')
|eval(lambda: float("Global_active_power") * 1000.0 / 60.0 - float("Sub_metering_1") - float("Sub_metering_2") - float("Sub_metering_3"))
.as('consumer')
|influxDBOut()
.database('house')
.retentionPolicy('autogen')
.measurement('house')

注意

  • influxdb 与 kapacitor 之间是订阅(subscription)关系,所以如果他们之间是跨机房的关系,需要在kapacitor的配置文件中指定hostname,在influxdb中用show subscriptions查看。
  • kapacitor 有测试脚本功能,然后通过重放来检查脚本是否正确执行。

模仿Chronograf操作Kapacitor

  • 脚本 list.tick ```javascript var data = stream |from() .database(‘house_data’) .retentionPolicy(‘autogen’) .measurement(‘house’) .where(lambda: (“metric” == ‘ljz’)) |eval(lambda: “deviation”) .as(‘value’)

var trigger = data |alert() .crit(lambda: “value” > 4) .message(‘Alert: power consumption’) .id(‘Power_Consumption_Rule’ + ‘:’) .idTag(‘alertID’) .levelTag(‘level’) .messageField(‘message’) .durationField(‘duration’) .details(‘The error of power consumption is greater than the threshold(6Wh)’) .email(‘lee_jiazh@163.com’, ‘qingquan@xiaogu-tech.com’)

trigger |influxDBOut() .create() .database(‘httpTest’) .retentionPolicy(‘autogen’) .measurement(‘httptest’) .tag(‘alertName’, ‘test_http’) .tag(‘triggerType’, ‘threshold’)

trigger |httpPost(‘https://xg-grafana.herokuapp.com/api/test’)

功能:向目标Post 一个Http请求,当trigger后向InfluxDB写数据。
1. InfluxDB 数据

time alertID alertName duration level message metric triggerType value —- ——- ——— ——– —– ——- —— ———– —– 1511323486959072129 Power_Consumption_Rule:nil test_http 0 CRITICAL Alert: power consumption ljz threshold 18.4 1511323498954461470 Power_Consumption_Rule:nil test_http 11995389341 CRITICAL Alert: power consumption ljz threshold 27.96666666666667

2. Http Post的数据:
```json
{ series: 
     [ { name: 'house',
         tags: 
          { alertID: 'Power_Consumption_Rule:nil',
            level: 'CRITICAL',
            metric: 'ljz' },
         columns: [ 'time', 'duration', 'message', 'value' ],
         values: 
          [ [ '2017-11-22T05:14:26.541257857Z',
              1344412905118,
              'Alert: power consumption',
              20.733333333333334 ] ] } ],
    _id: '3oTa37s9Uva9swx5' },

Tips

  • 如果只想要状态改变时候触发,在alert()下加入.stateChangesOnly()

Udf 实践

  1. 写一个python2脚本,名为ttest.py,放在/tmp/kapacitor_udf中。
  2. 从github上克隆下依赖的python-agent包。
    git clone https://github.com/influxdata/kapacitor.git /tmp/kapacitor_udf/kapacitor
    
  3. 修改配置文件(/etc/kapacitor/kapacitor.conf),PYTHONPATH为以来的目录中的两个py文件。
    [udf]
    [udf.functions]
     [udf.functions.tTest]
         # Run python
         prog = "/usr/bin/python2"
         # Pass args to python
         # -u for unbuffered STDIN and STDOUT
         # and the path to the script
         args = ["-u", "/tmp/kapacitor_udf/ttest.py"]
         # If the python process is unresponsive for 10s kill it
         timeout = "10s"
         # Define env vars for the process, in this case the PYTHONPATH
         [udf.functions.tTest.env]
             PYTHONPATH = "/tmp/kapacitor_udf/kapacitor/udf/agent/py"
    
  4. 安装agent依赖的环境和包 ``` apt install python-pip python2 -m pip install six scipy

安装protocol buffer

wget https://github.com/google/protobuf/releases/download/v3.5.0/protobuf-all-3.5.0.zip unzip -o protobuf-python-3.5.0.zip cd protobuf-3.5.0/ ./configure make make install export LD_LIBRARY_PATH=”/usr/local/lib”

看看有没有protoc命令,是否安装成功

安装python-protocol

git clone https://github.com/google/protobuf.git unzip protobuf-master.zip cd protobuf-master/python/ python2 setup.py build python2 setup.py install

4. 重启kapacitor
5. 写一个TICKscript脚本
```js
// This TICKscript monitors the three temperatures for a 3d printing job,
// and triggers alerts if the temperatures start to experience abnormal behavior.

// Define our desired significance level.
var alpha = 0.001

// Select the temperatures measurements
var data = stream
    |from()
        .measurement('temperatures')
    |window()
        .period(5m)
        .every(5m)

data
    //Run our tTest UDF on the hotend temperature
    @tTest()
        // specify the hotend field
        .field('hotend')
        // Keep a 1h rolling window
        .size(3600)
        // pass in the alpha value
        .alpha(alpha)
    |alert()
        .id('hotend')
        .crit(lambda: "pvalue" < alpha)
        .log('/tmp/kapacitor_udf/hotend_failure.log')

// Do the same for the bed and air temperature.
data
    @tTest()
        .field('bed')
        .size(3600)
        .alpha(alpha)
    |alert()
        .id('bed')
        .crit(lambda: "pvalue" < alpha)
        .log('/tmp/kapacitor_udf/bed_failure.log')

data
    @tTest()
        .field('air')
        .size(3600)
        .alpha(alpha)
    |alert()
        .id('air')
        .crit(lambda: "pvalue" < alpha)
        .log('/tmp/kapacitor_udf/air_failure.log')
  1. 写产生测试数据的脚本 ```python #!/usr/bin/python2

from numpy import random from datetime import timedelta, datetime import sys import time import requests

Target temperatures in C

hotend_t = 220 bed_t = 90 air_t = 70

Connection info

write_url = ‘http://localhost:9092/write?db=printer&rp=autogen&precision=s’ measurement = ‘temperatures’

def temp(target, sigma): “”” Pick a random temperature from a normal distribution centered on target temperature. “”” return random.normal(target, sigma)

def main(): hotend_sigma = 0 bed_sigma = 0 air_sigma = 0 hotend_offset = 0 bed_offset = 0 air_offset = 0

# Define some anomalies by changing sigma at certain times
# list of sigma values to start at a specified iteration
hotend_anomalies =[
    (0, 0.5, 0), # normal sigma
    (3600, 3.0, -1.5), # at one hour the hotend goes bad
    (3900, 0.5, 0), # 5 minutes later recovers
]
bed_anomalies =[
    (0, 1.0, 0), # normal sigma
    (28800, 5.0, 2.0), # at 8 hours the bed goes bad
    (29700, 1.0, 0), # 15 minutes later recovers
]
air_anomalies = [
    (0, 3.0, 0), # normal sigma
    (10800, 5.0, 0), # at 3 hours air starts to fluctuate more
    (43200, 15.0, -5.0), # at 12 hours air goes really bad
    (45000, 5.0, 0), # 30 minutes later recovers
    (72000, 3.0, 0), # at 20 hours goes back to normal
]

# Start from 2016-01-01 00:00:00 UTC
# This makes it easy to reason about the data later
now = datetime(2016, 1, 1)
second = timedelta(seconds=1)
epoch = datetime(1970,1,1)

# 24 hours of temperatures once per second
points = []
for i in range(60*60*24+2):
    # update sigma values
    if len(hotend_anomalies) > 0 and i == hotend_anomalies[0][0]:
        hotend_sigma = hotend_anomalies[0][1]
        hotend_offset = hotend_anomalies[0][2]
        hotend_anomalies = hotend_anomalies[1:]

    if len(bed_anomalies) > 0 and i == bed_anomalies[0][0]:
        bed_sigma = bed_anomalies[0][1]
        bed_offset = bed_anomalies[0][2]
        bed_anomalies = bed_anomalies[1:]

    if len(air_anomalies) > 0 and i == air_anomalies[0][0]:
        air_sigma = air_anomalies[0][1]
        air_offset = air_anomalies[0][2]
        air_anomalies = air_anomalies[1:]

    # generate temps
    hotend = temp(hotend_t+hotend_offset, hotend_sigma)
    bed = temp(bed_t+bed_offset, bed_sigma)
    air = temp(air_t+air_offset, air_sigma)
    points.append("%s hotend=%f,bed=%f,air=%f %d" % (
        measurement,
        hotend,
        bed,
        air,
        (now - epoch).total_seconds(),
    ))
    now += second

# Write data to Kapacitor
r = requests.post(write_url, data='\n'.join(points))
if r.status_code != 204:
    print >> sys.stderr, r.text
    return 1
return 0

if name == ‘main’: exit(main())


7. 运行

kapacitor define print_temps -type stream -dbrp printer.autogen -tick print_temps.tick kapacitor enable print_temps cat /tmp/kapacitor_udf/{hotend,bed,air}_failure.log ```

django oAuth2

Oauth2 Django

Part I

  • Install
    pip install django-oauth-toolkit django-cors-middleware
    
  • source/config/setting.py ```python INSTALLED_APPS += ( ‘django_extensions’, ‘debug_toolbar’, ‘oauth2_provider’, ‘corsheaders’, )

MIDDLEWARE += ( ‘debug_toolbar.middleware.DebugToolbarMiddleware’, ‘corsheaders.middleware.CorsMiddleware’, ) CORS_ORIGIN_ALLOW_ALL = True

AUTHENTICATION_BACKENDS = ( ‘django.contrib.auth.backends.ModelBackend’, ‘oauth2_provider.backends.OAuth2Backend’, )


* source/config/urls.py
```python
url(r'^o/', include('oauth2_provider.urls', namespace='oauth2_provider')),
  • Migrate
    python source/manger.py migrate
    

Part II

  • Register your application
    http://localhost:8000/o/applications/ 
    
  • Information
    *Client id* and *Client Secret* are automatically generated;
    ID = kzsUOB12CcXKkSZfG90dLcons33daYYqafjBUdAw
    secret = qN7MiCfG9cUpo2kAaj0lDkbpNBBgHKlIiO3GeeGS2dQkxwFUVS2NpWlPVCiVCpmnEjTLYtIPWBJwubWGB3SqF4fKypqsyVCCvX5DebaQW82shdyQIH96lWcPWFKYUtBj
    

Part III

注:当前的两个程序,A为provider(提供权限的一方)端口为8000,B为请求的一方,端口为8082。
  1. 请求Bhttp://127.0.0.1:8082,设置url重定向
  2. 重定向A的http://127.0.0.1:8000/o/authorize/?state=xx&client_id=xx&response_type=code
  3. 浏览器请求,A判断是否登录,authorize是login_required的,所以会先判断是否登录,然后进入授权界面
  4. 按照预先返回的要求,重定向到指定url
  5. 在该url内获取到接收数据code和state,其中code是authorization_code。
  6. 向A的8080请求获取Access Token,http://127.0.0.1:8000/o/token/?code=xx&redirect_uri=xx&grant_type=authorization_code&client_id=xx,注意要用post方法
    // 返回
    {
     "refresh_token": "k6hT71TZIsPIQYVJqxlzcoN6j6k306",
     "access_token": "kltBlehmZbSJ9yCWpBv5f8St1oqLhu",
     "expires_in": 36000,
     "token_type": "Bearer",
     "scope": "write read"
    }
    
  7. 带着headers={‘acess_token’: ‘xxx’}访问A的url,则A访问数据库,返回给B所需要的信息。
  8. 重写template的授权页面,在templates添加oauth2_provider目录,在其中添加authorize.html,重写界面,第一行扩展base.html

    Part IV

    • Google Oauth2
      pip install  oauth2client
      
    • Before Starting 注册一个帐号在google develop中,并enable相应的权限(scope)API,获取clientID。
    • build a flow
      from oauth2client.client import OAuth2WebServerFlow
      flow = OAuth2WebServerFlow(client_id='xxx',
                           client_secret='X1nUWXLi9UxMk-rGj0oWohfb',
                           # scope='https://www.googleapis.com/auth/plus.login',
                           scope="https://www.googleapis.com/auth/userinfo.email",
                           redirect_uri='http://localhost:8002/user/google_token/',
                           authuser=-1,)
      
    • First Step
      auth_uri = flow.step1_get_authorize_url()
      return redirect(auth_uri)
      
  • Second Step
    response = requests.post("https://www.googleapis.com/oauth2/v4/token",
                                   data={'code': code, 'redirect_uri': 'http://localhost:8002/user/google_token/',
                                         'grant_type': 'authorization_code',
                                         'client_id': '769145275139-ano9e5tp62s656jta058176o36pv3qpf.apps.googleusercontent.com',
                                         'client_secret': 'X1nUWXLi9UxMk-rGj0oWohfb'}).json()
          res = requests.get("https://www.googleapis.com/oauth2/v2/userinfo?access_token=" + response["access_token"]).json()
    
  • Warning: refresh_token only exists in the first time…

inch——influxdb测试工具

series memory
1000*2 150MB
1000*20 200MB
1000*200 900MB
1000*500 1.9GB
1000*1000 3.0GB

IoT

inch -report-host http://localhost:8086 -v -p 30 -t "1000,2" -f 100
  • Concurrency=1(200个节点发送数据)
  • BatchSize=5000
  • FieldNum=100
  • Series=1000*2
  • Points=3000

    结果

  • 硬盘:1.9GB
  • 运行时长: 2358.7秒
  • (2543.8 pt/sec 254377.9 val/sec) errors: 0 μ: 1.852564663s, 90%: 3.474647015s, 95%: 3.947272979s, 99%: 5.192380764s

金融测试

inch -report-host http://localhost:8086 -v -b 200000 -t "10000,20" -c 1 -p 720
  • Concurrency=1
  • BatchSize=200K
  • FieldNum=1
  • Series=10K*20
  • Points=720,一分钟一个点,运行12小时的量。

    结果

  • 硬盘:1.2GB
  • 运行时长:1201.3秒
  • (119875.1 pt/sec 119875.1 val/sec) errors: 0 μ: 1.645184386s, 90%: 2.633023203s, 95%: 2.993008777s, 99%: 3.889885468s

模拟达能

inch -host https://twinpines-9429794e.influxcloud.net:8086 -password 123q456w -user xiaogu -v -b 1000 -c 20 -f 1 -t "1000,20" -p 100
  • 结果 ``` T=00000037 2000000 points written (53287.3 pt/sec | 53287.3 val/sec) errors: 0 | μ: 367.216902ms, 90%: 556.65386ms, 95%: 765.484122ms, 99%: 1.674279231s

Total time: 37.5 seconds

# 文件大小命令
```bash
sudo du -s -h /var/lib/influxdb/data/